Merge "Catch novaclient up with renaming and other nova changes."

This commit is contained in:
Jenkins 2012-01-25 01:06:56 +00:00 committed by Gerrit Code Review
commit 25bb2a4125
6 changed files with 262 additions and 128 deletions

View File

@ -47,8 +47,9 @@ class Client(object):
self.servers = servers.ServerManager(self)
# extensions
self.dns_domains = floating_ip_dns.FloatingIPDNSDomainManager(self)
self.dns_entries = floating_ip_dns.FloatingIPDNSEntryManager(self)
self.floating_ips = floating_ips.FloatingIPManager(self)
self.floating_ip_dns = floating_ip_dns.FloatingIPDNSManager(self)
self.floating_ip_pools = floating_ip_pools.FloatingIPPoolManager(self)
self.volumes = volumes.VolumeManager(self)
self.volume_snapshots = volume_snapshots.SnapshotManager(self)

View File

@ -18,56 +18,116 @@ import urllib
from novaclient import base
def _quote_zone(zone):
"""Special quoting rule for placing zone names on a url line.
def _quote_domain(domain):
"""Special quoting rule for placing domain names on a url line.
Zone names tend to have .'s in them. Urllib doesn't quote dots,
Domain names tend to have .'s in them. Urllib doesn't quote dots,
but Routes tends to choke on them, so we need an extra level of
by-hand quoting here.
"""
return urllib.quote(zone.replace('.', '%2E'))
return urllib.quote(domain.replace('.', '%2E'))
class FloatingIPDNS(base.Resource):
class FloatingIPDNSDomain(base.Resource):
def delete(self):
self.manager.delete_entry(self.name, self.zone)
self.manager.delete(self.domain)
def create(self):
if self.scope == 'public':
self.manager.create_public(self.domain, self.project)
else:
self.manager.create_private(self.domain, self.availability_zone)
def get(self):
entries = self.manager.domains()
for entry in entries:
if entry.get('domain') == self.domain:
return entry
return None
class FloatingIPDNSManager(base.ManagerWithFind):
resource_class = FloatingIPDNS
class FloatingIPDNSDomainManager(base.ManagerWithFind):
resource_class = FloatingIPDNSDomain
def zones(self):
"""Return the list of available dns zones."""
return self._list("/os-floating-ip-dns", "zones")
def domains(self):
"""Return the list of available dns domains."""
return self._list("/os-floating-ip-dns", "domain_entries")
def get_entries(self, zone, name=None, ip=None):
"""Return a list of entries for the given zone and ip or name."""
qparams = {}
if name:
qparams['name'] = name
if ip:
qparams['ip'] = ip
def create_private(self, fqdomain, availability_zone):
"""Add or modify a private DNS domain."""
body = {'domain_entry':
{'scope': 'private',
'availability_zone': availability_zone}}
params = "?%s" % urllib.urlencode(qparams) if qparams else ""
return self._update('/os-floating-ip-dns/%s' % _quote_domain(fqdomain),
body)
return self._list("/os-floating-ip-dns/%s%s" %
(_quote_zone(zone), params),
def create_public(self, fqdomain, project):
"""Add or modify a public DNS domain."""
body = {'domain_entry':
{'scope': 'public',
'project': project}}
return self._update('/os-floating-ip-dns/%s' % _quote_domain(fqdomain),
body)
def delete(self, fqdomain):
"""Delete the specified domain"""
self._delete("/os-floating-ip-dns/%s" % _quote_domain(fqdomain))
class FloatingIPDNSEntry(base.Resource):
def delete(self):
self.manager.delete(self.name, self.domain)
def create(self):
self.manager.create(self.domain, self.name,
self.ip, self.dns_type)
def get(self):
return self.manager.get(self.domain, self.name)
class FloatingIPDNSEntryManager(base.ManagerWithFind):
resource_class = FloatingIPDNSEntry
def get(self, domain, name):
"""Return a list of entries for the given domain and ip or name."""
return self._get("/os-floating-ip-dns/%s/entries/%s" %
(_quote_domain(domain), name),
"dns_entry")
def get_for_ip(self, domain, ip):
"""Return a list of entries for the given domain and ip or name."""
qparams = {'ip': ip}
params = "?%s" % urllib.urlencode(qparams)
return self._list("/os-floating-ip-dns/%s/entries%s" %
(_quote_domain(domain), params),
"dns_entries")
def create_entry(self, zone, name, ip, dns_type):
def create(self, domain, name, ip, dns_type):
"""Add a new DNS entry."""
body = {'dns_entry':
{'name': name,
'ip': ip,
'dns_type': dns_type,
'zone': zone}}
{'ip': ip,
'dns_type': dns_type}}
return self._create("/os-floating-ip-dns", body, "dns_entry")
return self._update("/os-floating-ip-dns/%s/entries/%s" %
(_quote_domain(domain), name),
body)
def delete_entry(self, zone, name):
"""Delete entry specified by name and zone."""
qparams = {'name': name}
params = "?%s" % urllib.urlencode(qparams) if qparams else ""
def modify_ip(self, domain, name, ip):
"""Add a new DNS entry."""
body = {'dns_entry':
{'ip': ip,
'dns_type': 'A'}}
self._delete("/os-floating-ip-dns/%s%s" %
(_quote_zone(zone), params))
return self._update("/os-floating-ip-dns/%s/entries/%s" %
(_quote_domain(domain), name),
body)
def delete(self, domain, name):
"""Delete entry specified by name and domain."""
self._delete("/os-floating-ip-dns/%s/entries/%s" %
(_quote_domain(domain), name))

View File

@ -1075,45 +1075,81 @@ def do_floating_ip_pool_list(cs, args):
def _print_dns_list(dns_entries):
utils.print_list(dns_entries, ['ip', 'zone', 'name'])
utils.print_list(dns_entries, ['ip', 'name', 'domain'])
def do_dns_zones(cs, args):
"""Print a list of available dns zones."""
zones = cs.floating_ip_dns.zones()
utils.print_list(zones, ['zone'])
def _print_domain_list(domain_entries):
utils.print_list(domain_entries, ['domain', 'scope',
'project', 'availability_zone'])
@utils.arg('zone', metavar='<zone>', help='DNS zone')
def do_dns_domains(cs, args):
"""Print a list of available dns domains."""
domains = cs.dns_domains.domains()
_print_domain_list(domains)
@utils.arg('domain', metavar='<domain>', help='DNS domain')
@utils.arg('--ip', metavar='<ip>', help='ip address', default=None)
@utils.arg('--name', metavar='<name>', help='DNS name', default=None)
def do_dns_list(cs, args):
"""List current DNS entries for zone and ip or zone and name."""
"""List current DNS entries for domain and ip or domain and name."""
if not (args.ip or args.name):
raise exceptions.CommandError(
"You must specify either --ip or --name")
entries = cs.floating_ip_dns.get_entries(args.zone,
ip=args.ip, name=args.name)
_print_dns_list(entries)
if args.name:
entry = cs.dns_entries.get(args.domain, args.name)
_print_dns_list([entry])
else:
entries = cs.dns_entries.get_for_ip(args.domain,
ip=args.ip)
_print_dns_list(entries)
@utils.arg('zone', metavar='<zone>', help='DNS zone')
@utils.arg('name', metavar='<name>', help='DNS name')
@utils.arg('ip', metavar='<ip>', help='ip address')
@utils.arg('name', metavar='<name>', help='DNS name')
@utils.arg('domain', metavar='<domain>', help='DNS domain')
@utils.arg('--type', metavar='<type>', help='dns type (e.g. "A")',
default='A')
def do_dns_create(cs, args):
"""Create a DNS entry for zone, name and ip."""
entries = cs.floating_ip_dns.create_entry(args.zone, args.name,
args.ip, args.type)
_print_dns_list([entries])
"""Create a DNS entry for domain, name and ip."""
entries = cs.dns_entries.create(args.domain, args.name,
args.ip, args.type)
@utils.arg('zone', metavar='<zone>', help='DNS zone')
@utils.arg('domain', metavar='<domain>', help='DNS domain')
@utils.arg('name', metavar='<name>', help='DNS name')
def do_dns_delete(cs, args):
"""Delete the specified DNS entry."""
cs.floating_ip_dns.delete_entry(args.zone, args.name)
cs.dns_entries.delete(args.domain, args.name)
@utils.arg('domain', metavar='<domain>', help='DNS domain')
def do_dns_delete_domain(cs, args):
"""Delete the specified DNS domain."""
cs.dns_domains.delete(args.domain)
@utils.arg('domain', metavar='<domain>', help='DNS domain')
@utils.arg('--availability_zone', metavar='<availability_zone>',
help='Limit access to this domain to instances '
'in the specified availability zone.',
default=None)
def do_dns_create_private_domain(cs, args):
"""Create the specified DNS domain."""
cs.dns_domains.create_private(args.domain,
args.availability_zone)
@utils.arg('domain', metavar='<domain>', help='DNS domain')
@utils.arg('--project', metavar='<project>',
help='Limit access to this domain to users '
'of the specified project.',
default=None)
def do_dns_create_public_domain(cs, args):
"""Create the specified DNS domain."""
cs.dns_domains.create_public(args.domain,
args.project)
def _print_secgroup_rules(rules):

View File

@ -389,44 +389,55 @@ class FakeHTTPClient(base_client.HTTPClient):
return (204, None)
def get_os_floating_ip_dns(self, **kw):
return (205, {'zones':
[{'zone': 'example.org'},
{'zone': 'example.com'}]})
return (205, {'domain_entries':
[{'domain': 'example.org'},
{'domain': 'example.com'}]})
def get_os_floating_ip_dns_zone1(self, **kw):
def get_os_floating_ip_dns_testdomain_entries(self, **kw):
if kw.get('ip'):
return (205, {'dns_entries':
[{'dns_entry':
{'ip': kw.get('ip'),
'name': "host1",
'type': "A",
'zone': 'zone1'}},
'domain': 'testdomain'}},
{'dns_entry':
{'ip': kw.get('ip'),
'name': "host2",
'type': "A",
'zone': 'zone1'}}]})
if kw.get('name'):
return (205, {'dns_entries':
[{'dns_entry':
{'ip': "10.10.10.10",
'name': kw.get('name'),
'type': "A",
'zone': 'zone1'}}]})
'domain': 'testdomain'}}]})
else:
return (404, None)
def post_os_floating_ip_dns(self, body, **kw):
fakes.assert_has_keys(body['dns_entry'],
required=['name', 'ip', 'dns_type', 'zone'])
def get_os_floating_ip_dns_testdomain_entries_testname(self, **kw):
return (205, {'dns_entry':
{'ip': body['dns_entry'].get('ip'),
'name': body['dns_entry'].get('name'),
'type': body['dns_entry'].get('dns_type'),
'zone': body['dns_entry'].get('zone')}})
{'ip': "10.10.10.10",
'name': 'testname',
'type': "A",
'domain': 'testdomain'}})
def delete_os_floating_ip_dns_zone1(self, **kw):
assert 'name' in kw
def put_os_floating_ip_dns_testdomain(self, body, **kw):
if body['domain_entry']['scope'] == 'private':
fakes.assert_has_keys(body['domain_entry'],
required=['availability_zone', 'scope'])
elif body['domain_entry']['scope'] == 'public':
fakes.assert_has_keys(body['domain_entry'],
required=['project', 'scope'])
else:
fakes.assert_has_keys(body['domain_entry'],
required=['project', 'scope'])
return (205, None)
def put_os_floating_ip_dns_testdomain_entries_testname(self, body, **kw):
fakes.assert_has_keys(body['dns_entry'],
required=['ip', 'dns_type'])
return (205, None)
def delete_os_floating_ip_dns_testdomain(self, **kw):
return (200, None)
def delete_os_floating_ip_dns_testdomain_entries_testname(self, **kw):
return (200, None)
#

View File

@ -7,63 +7,70 @@ from tests import utils
cs = fakes.FakeClient()
def _quote_zone(zone):
"""
Zone names tend to have .'s in them. Urllib doesn't quote dots,
but Routes tends to choke on them, so we need an extra level of
by-hand quoting here. This function needs to duplicate the one in
python-novaclient/novaclient/v1_1/floating_ip_dns.py
"""
return urllib.quote(zone.replace('.', '%2E'))
class FloatingIPDNSDomainTest(utils.TestCase):
testdomain = "testdomain"
def test_dns_domains(self):
domainlist = cs.dns_domains.domains()
self.assertEqual(len(domainlist), 2)
for entry in domainlist:
self.assertTrue(isinstance(entry,
floating_ip_dns.FloatingIPDNSDomain))
self.assertEqual(domainlist[1].domain, 'example.com')
def test_create_private_domain(self):
cs.dns_domains.create_private(self.testdomain, 'test_avzone')
cs.assert_called('PUT', '/os-floating-ip-dns/%s' %
self.testdomain)
def test_create_public_domain(self):
cs.dns_domains.create_public(self.testdomain, 'test_project')
cs.assert_called('PUT', '/os-floating-ip-dns/%s' %
self.testdomain)
def test_delete_domain(self):
cs.dns_domains.delete(self.testdomain)
cs.assert_called('DELETE', '/os-floating-ip-dns/%s' %
self.testdomain)
class FloatingIPDNSTest(utils.TestCase):
class FloatingIPDNSEntryTest(utils.TestCase):
testname = "somehostname"
testname = "testname"
testip = "1.2.3.4"
testzone = "zone1"
testdomain = "testdomain"
testtype = "A"
def test_dns_zones(self):
zonelist = cs.floating_ip_dns.zones()
self.assertEqual(len(zonelist), 2)
for entry in zonelist:
self.assertTrue(isinstance(entry, floating_ip_dns.FloatingIPDNS))
self.assertEqual(zonelist[1].zone, 'example.com')
def test_get_dns_entries_by_ip(self):
entries = cs.floating_ip_dns.get_entries(self.testzone, ip=self.testip)
entries = cs.dns_entries.get_for_ip(self.testdomain, ip=self.testip)
self.assertEqual(len(entries), 2)
for entry in entries:
self.assertTrue(isinstance(entry, floating_ip_dns.FloatingIPDNS))
self.assertTrue(isinstance(entry,
floating_ip_dns.FloatingIPDNSEntry))
self.assertEqual(entries[1].dns_entry['name'], 'host2')
self.assertEqual(entries[1].dns_entry['ip'], self.testip)
def test_get_dns_entries_by_name(self):
entries = cs.floating_ip_dns.get_entries(self.testzone,
name=self.testname)
self.assertEqual(len(entries), 1)
self.assertTrue(isinstance(entries[0], floating_ip_dns.FloatingIPDNS))
self.assertEqual(entries[0].dns_entry['name'], self.testname)
def test_get_dns_entry_by_name(self):
entry = cs.dns_entries.get(self.testdomain,
self.testname)
self.assertTrue(isinstance(entry, floating_ip_dns.FloatingIPDNSEntry))
self.assertEqual(entry.name, self.testname)
def test_create_entry(self):
response = cs.floating_ip_dns.create_entry(self.testzone,
self.testname,
self.testip,
self.testtype)
self.assertEqual(response.name, self.testname)
self.assertEqual(response.ip, self.testip)
self.assertEqual(response.zone, self.testzone)
self.assertEqual(response.type, self.testtype)
cs.dns_entries.create(self.testdomain,
self.testname,
self.testip,
self.testtype)
cs.assert_called('PUT', '/os-floating-ip-dns/%s/entries/%s' %
(self.testdomain, self.testname))
def test_delete_entry(self):
response = cs.floating_ip_dns.delete_entry(self.testzone,
self.testname)
cs.assert_called('DELETE', '/os-floating-ip-dns/%s?name=%s' %
(self.testzone, self.testname))
cs.dns_entries.delete(self.testdomain, self.testname)
cs.assert_called('DELETE', '/os-floating-ip-dns/%s/entries/%s' %
(self.testdomain, self.testname))

View File

@ -300,25 +300,44 @@ class ShellTest(utils.TestCase):
self.assert_called('DELETE', '/servers/1234/metadata/key2', pos=-2)
def test_dns_create(self):
self.run_command('dns-create zone1 testname 192.168.1.1')
self.assert_called('POST', '/os-floating-ip-dns')
self.run_command('dns-create 192.168.1.1 testname testdomain')
self.assert_called('PUT',
'/os-floating-ip-dns/testdomain/entries/testname')
self.run_command('dns-create zone1 tn 192.168.1.1 --type A')
self.assert_called('POST', '/os-floating-ip-dns')
self.run_command('dns-create 192.168.1.1 testname testdomain --type A')
self.assert_called('PUT',
'/os-floating-ip-dns/testdomain/entries/testname')
def test_dns_create_public_domain(self):
self.run_command('dns-create-public-domain testdomain '
'--project test_project')
self.assert_called('PUT', '/os-floating-ip-dns/testdomain')
def test_dns_create_private_domain(self):
self.run_command('dns-create-private-domain testdomain '
'--availability_zone av_zone')
self.assert_called('PUT', '/os-floating-ip-dns/testdomain')
def test_dns_delete(self):
self.run_command('dns-delete zone1 testname')
self.assert_called('DELETE', '/os-floating-ip-dns/zone1?name=testname')
self.run_command('dns-delete testdomain testname')
self.assert_called('DELETE',
'/os-floating-ip-dns/testdomain/entries/testname')
def test_dns_delete_domain(self):
self.run_command('dns-delete-domain testdomain')
self.assert_called('DELETE', '/os-floating-ip-dns/testdomain')
def test_dns_list(self):
self.run_command('dns-list zone1 --ip 192.168.1.1')
self.assert_called('GET', '/os-floating-ip-dns/zone1?ip=192.168.1.1')
self.run_command('dns-list testdomain --ip 192.168.1.1')
self.assert_called('GET',
'/os-floating-ip-dns/testdomain/entries?ip=192.168.1.1')
self.run_command('dns-list zone1 --name testname')
self.assert_called('GET', '/os-floating-ip-dns/zone1?name=testname')
self.run_command('dns-list testdomain --name testname')
self.assert_called('GET',
'/os-floating-ip-dns/testdomain/entries/testname')
def test_dns_zones(self):
self.run_command('dns-zones')
def test_dns_domains(self):
self.run_command('dns-domains')
self.assert_called('GET', '/os-floating-ip-dns')
def test_usage_list(self):