fix pep8 complaints.

make pep8 now is silent on precise's pep8 ( 0.6.1-2ubuntu2).
This commit is contained in:
Scott Moser 2012-08-22 14:12:32 -04:00
parent 899f24ec08
commit 8c8c128192
31 changed files with 157 additions and 148 deletions

View File

@ -16,8 +16,8 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
from cloudinit import util
from cloudinit.settings import PER_INSTANCE
from cloudinit import util
frequency = PER_INSTANCE
@ -50,7 +50,7 @@ def handle(_name, cfg, cloud, log, _args):
def write_apt_snippet(cloud, setting, log, f_name):
""" Writes f_name with apt pipeline depth 'setting' """
"""Writes f_name with apt pipeline depth 'setting'."""
file_contents = APT_PIPE_TPL % (setting)

View File

@ -153,7 +153,7 @@ def rename_apt_lists(omirror, new_mirror, lists_d="/var/lib/apt/lists"):
return
olen = len(oprefix)
for filename in glob.glob("%s_*" % oprefix):
# TODO use the cloud.paths.join...
# TODO(harlowja) use the cloud.paths.join...
util.rename(filename, "%s%s" % (nprefix, filename[olen:]))
@ -232,7 +232,7 @@ def add_sources(cloud, srclist, template_params=None):
def find_apt_mirror(cloud, cfg):
""" find an apt_mirror given the cloud and cfg provided """
"""find an apt_mirror given the cloud and cfg provided."""
mirror = None

View File

@ -20,8 +20,8 @@
import os
from cloudinit import util
from cloudinit.settings import PER_ALWAYS
from cloudinit import util
frequency = PER_ALWAYS

View File

@ -20,8 +20,8 @@
import os
from cloudinit import util
from cloudinit.settings import PER_ALWAYS
from cloudinit import util
frequency = PER_ALWAYS
@ -44,5 +44,5 @@ def handle(name, _cfg, cloud, log, args):
try:
util.subp(cmd)
except Exception as e:
# TODO, use log exception from utils??
# TODO(harlowja), use log exception from utils??
log.warn("Emission of upstart event %s failed due to: %s", n, e)

View File

@ -48,7 +48,8 @@ def handle(name, cfg, cloud, log, _args):
# Create object for reading puppet.conf values
puppet_config = helpers.DefaultingConfigParser()
# Read puppet.conf values from original file in order to be able to
# mix the rest up. First clean them up (TODO is this really needed??)
# mix the rest up. First clean them up
# (TODO(harlowja) is this really needed??)
cleaned_lines = [i.lstrip() for i in contents.splitlines()]
cleaned_contents = '\n'.join(cleaned_lines)
puppet_config.readfp(StringIO(cleaned_contents),
@ -80,7 +81,7 @@ def handle(name, cfg, cloud, log, _args):
for (o, v) in cfg.iteritems():
if o == 'certname':
# Expand %f as the fqdn
# TODO should this use the cloud fqdn??
# TODO(harlowja) should this use the cloud fqdn??
v = v.replace("%f", socket.getfqdn())
# Expand %i as the instance id
v = v.replace("%i", cloud.get_instance_id())

View File

@ -22,8 +22,8 @@ import os
import stat
import time
from cloudinit import util
from cloudinit.settings import PER_ALWAYS
from cloudinit import util
frequency = PER_ALWAYS
@ -72,12 +72,12 @@ def handle(name, cfg, cloud, log, args):
log.debug("Skipping module named %s, resizing disabled", name)
return
# TODO is the directory ok to be used??
# TODO(harlowja) is the directory ok to be used??
resize_root_d = util.get_cfg_option_str(cfg, "resize_rootfs_tmp", "/run")
resize_root_d = cloud.paths.join(False, resize_root_d)
util.ensure_dir(resize_root_d)
# TODO: allow what is to be resized to be configurable??
# TODO(harlowja): allow what is to be resized to be configurable??
resize_what = cloud.paths.join(False, "/")
with util.ExtendedTemporaryFile(prefix="cloudinit.resizefs.",
dir=resize_root_d, delete=True) as tfh:
@ -136,5 +136,5 @@ def do_resize(resize_cmd, log):
raise
tot_time = time.time() - start
log.debug("Resizing took %.3f seconds", tot_time)
# TODO: Should we add a fsck check after this to make
# TODO(harlowja): Should we add a fsck check after this to make
# sure we didn't corrupt anything?

View File

@ -37,9 +37,9 @@
import os
from cloudinit.settings import PER_INSTANCE
from cloudinit import url_helper as uhelp
from cloudinit import util
from cloudinit.settings import PER_INSTANCE
from urlparse import parse_qs
@ -72,7 +72,7 @@ def handle(name, _cfg, cloud, log, _args):
captured_excps = []
# These will eventually be then ran by the cc_scripts_user
# TODO: maybe this should just be a new user data handler??
# TODO(harlowja): maybe this should just be a new user data handler??
# Instead of a late module that acts like a user data handler?
scripts_d = cloud.get_ipath_cur('scripts')
urls = mdict[MY_HOOKNAME]

View File

@ -18,11 +18,11 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import os
import glob
import os
from cloudinit import util
from cloudinit import ssh_util
from cloudinit import util
DISABLE_ROOT_OPTS = ("no-port-forwarding,no-agent-forwarding,"
"no-X11-forwarding,command=\"echo \'Please login as the user \\\"$USER\\\" "
@ -76,7 +76,7 @@ def handle(_name, cfg, cloud, log, _args):
pair = (KEY_2_FILE[priv][0], KEY_2_FILE[pub][0])
cmd = ['sh', '-xc', KEY_GEN_TPL % pair]
try:
# TODO: Is this guard needed?
# TODO(harlowja): Is this guard needed?
with util.SeLinuxGuard("/etc/ssh", recursive=True):
util.subp(cmd, capture=False)
log.debug("Generated a key for %s from %s", pair[0], pair[1])
@ -94,7 +94,7 @@ def handle(_name, cfg, cloud, log, _args):
if not os.path.exists(keyfile):
cmd = ['ssh-keygen', '-t', keytype, '-N', '', '-f', keyfile]
try:
# TODO: Is this guard needed?
# TODO(harlowja): Is this guard needed?
with util.SeLinuxGuard("/etc/ssh", recursive=True):
util.subp(cmd, capture=False)
except:

View File

@ -28,7 +28,7 @@ from cloudinit import util
def _split_hash(bin_hash):
split_up = []
for i in xrange(0, len(bin_hash), 2):
split_up.append(bin_hash[i:i+2])
split_up.append(bin_hash[i:i + 2])
return split_up

View File

@ -18,8 +18,8 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
from cloudinit import util
from cloudinit import templater
from cloudinit import util
from cloudinit.settings import PER_ALWAYS

View File

@ -20,8 +20,8 @@
import os
from cloudinit import util
from cloudinit.settings import PER_ALWAYS
from cloudinit import util
frequency = PER_ALWAYS

View File

@ -19,8 +19,8 @@
import base64
import os
from cloudinit import util
from cloudinit.settings import PER_INSTANCE
from cloudinit import util
frequency = PER_INSTANCE
@ -46,7 +46,7 @@ def canonicalize_extraction(encoding_type, log):
return ['application/x-gzip']
if encoding_type in ['gz+base64', 'gzip+base64', 'gz+b64', 'gzip+b64']:
return ['application/base64', 'application/x-gzip']
# Yaml already encodes binary data as base64 if it is given to the
# Yaml already encodes binary data as base64 if it is given to the
# yaml file as binary, so those will be automatically decoded for you.
# But the above b64 is just for people that are more 'comfortable'
# specifing it manually (which might be a possiblity)

View File

@ -28,7 +28,7 @@ from cloudinit import importer
from cloudinit import log as logging
from cloudinit import util
# TODO: Make this via config??
# TODO(harlowja): Make this via config??
IFACE_ACTIONS = {
'up': ['ifup', '--all'],
'down': ['ifdown', '--all'],

View File

@ -69,7 +69,7 @@ class Distro(distros.Distro):
self.package_command('install', pkglist)
def _write_network(self, settings):
# TODO fix this... since this is the ubuntu format
# TODO(harlowja) fix this... since this is the ubuntu format
entries = translate_network(settings)
LOG.debug("Translated ubuntu style network settings %s into %s",
settings, entries)
@ -258,7 +258,7 @@ class QuotingConfigObj(ConfigObj):
# This is a util function to translate a ubuntu /etc/network/interfaces 'blob'
# to a rhel equiv. that can then be written to /etc/sysconfig/network-scripts/
# TODO remove when we have python-netcf active...
# TODO(harlowja) remove when we have python-netcf active...
def translate_network(settings):
# Get the standard cmd, args from the ubuntu format
entries = []

View File

@ -133,7 +133,7 @@ def walker_handle_handler(pdata, _ctype, _filename, payload):
modfname = os.path.join(pdata['handlerdir'], "%s" % (modname))
if not modfname.endswith(".py"):
modfname = "%s.py" % (modfname)
# TODO: Check if path exists??
# TODO(harlowja): Check if path exists??
util.write_file(modfname, payload, 0600)
handlers = pdata['handlers']
try:

View File

@ -43,7 +43,7 @@ class ShellScriptPartHandler(handlers.Handler):
def _handle_part(self, _data, ctype, filename, payload, _frequency):
if ctype in handlers.CONTENT_SIGNALS:
# TODO: maybe delete existing things here
# TODO(harlowja): maybe delete existing things here
return
filename = util.clean_filename(filename)

View File

@ -21,8 +21,8 @@
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import logging
import logging.handlers
import logging.config
import logging.handlers
import collections
import os

View File

@ -49,8 +49,7 @@ class DataSourceCloudStack(sources.DataSource):
self.metadata_address = "http://%s/" % (gw_addr)
def get_default_gateway(self):
""" Returns the default gateway ip address in the dotted format
"""
"""Returns the default gateway ip address in the dotted format."""
lines = util.load_file("/proc/net/route").splitlines()
for line in lines:
items = line.split("\t")

View File

@ -124,12 +124,12 @@ class NonConfigDriveDir(Exception):
def find_cfg_drive_device():
""" Get the config drive device. Return a string like '/dev/vdb'
or None (if there is no non-root device attached). This does not
check the contents, only reports that if there *were* a config_drive
attached, it would be this device.
Note: per config_drive documentation, this is
"associated as the last available disk on the instance"
"""Get the config drive device. Return a string like '/dev/vdb'
or None (if there is no non-root device attached). This does not
check the contents, only reports that if there *were* a config_drive
attached, it would be this device.
Note: per config_drive documentation, this is
"associated as the last available disk on the instance"
"""
# This seems to be for debugging??
@ -160,7 +160,7 @@ def read_config_drive_dir(source_dir):
string populated. If not a valid dir, raise a NonConfigDriveDir
"""
# TODO: fix this for other operating systems...
# TODO(harlowja): fix this for other operating systems...
# Ie: this is where https://fedorahosted.org/netcf/ or similar should
# be hooked in... (or could be)
found = {}

View File

@ -326,7 +326,7 @@ class Init(object):
'paths': self.paths,
'datasource': self.datasource,
}
# TODO Hmmm, should we dynamically import these??
# TODO(harlowja) Hmmm, should we dynamically import these??
def_handlers = [
cc_part.CloudConfigPartHandler(**opts),
ss_part.ShellScriptPartHandler(**opts),
@ -519,7 +519,7 @@ class Modules(object):
" but not on %s distro. It may or may not work"
" correctly."), name, worked_distros, d_name)
# Use the configs logger and not our own
# TODO: possibly check the module
# TODO(harlowja): possibly check the module
# for having a LOG attr and just give it back
# its own logger?
func_args = [name, self.cfg,

View File

@ -23,9 +23,9 @@
import os
import email
from email.mime.base import MIMEBase
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
from email.mime.base import MIMEBase
from cloudinit import handlers
from cloudinit import log as logging
@ -159,7 +159,7 @@ class UserDataProcessor(object):
if isinstance(ent, (str, basestring)):
ent = {'content': ent}
if not isinstance(ent, (dict)):
# TODO raise?
# TODO(harlowja) raise?
continue
content = ent.get('content', '')

View File

@ -24,8 +24,8 @@
from StringIO import StringIO
import copy as obj_copy
import contextlib
import copy as obj_copy
import errno
import glob
import grp
@ -317,8 +317,9 @@ def multi_log(text, console=True, stderr=True,
else:
log.log(log_level, text)
def is_ipv4(instr):
""" determine if input string is a ipv4 address. return boolean"""
"""determine if input string is a ipv4 address. return boolean."""
toks = instr.split('.')
if len(toks) != 4:
return False
@ -826,12 +827,12 @@ def get_cmdline_url(names=('cloud-config-url', 'url'),
def is_resolvable(name):
""" determine if a url is resolvable, return a boolean
"""determine if a url is resolvable, return a boolean
This also attempts to be resilent against dns redirection.
Note, that normal nsswitch resolution is used here. So in order
to avoid any utilization of 'search' entries in /etc/resolv.conf
we have to append '.'.
we have to append '.'.
The top level 'invalid' domain is invalid per RFC. And example.com
should also not exist. The random entry will be resolved inside
@ -847,7 +848,7 @@ def is_resolvable(name):
try:
result = socket.getaddrinfo(iname, None, 0, 0,
socket.SOCK_STREAM, socket.AI_CANONNAME)
badresults[iname] = []
badresults[iname] = []
for (_fam, _stype, _proto, cname, sockaddr) in result:
badresults[iname].append("%s: %s" % (cname, sockaddr[0]))
badips.add(sockaddr[0])
@ -856,7 +857,7 @@ def is_resolvable(name):
_DNS_REDIRECT_IP = badips
if badresults:
LOG.debug("detected dns redirection: %s" % badresults)
try:
result = socket.getaddrinfo(name, None)
# check first result's sockaddr field
@ -874,7 +875,7 @@ def get_hostname():
def is_resolvable_url(url):
""" determine if this url is resolvable (existing or ip) """
"""determine if this url is resolvable (existing or ip)."""
return (is_resolvable(urlparse.urlparse(url).hostname))
@ -1105,7 +1106,7 @@ def hash_blob(blob, routine, mlen=None):
def rename(src, dest):
LOG.debug("Renaming %s to %s", src, dest)
# TODO use a se guard here??
# TODO(harlowja) use a se guard here??
os.rename(src, dest)

View File

@ -1,6 +1,6 @@
import StringIO
import logging
import os
import StringIO
import sys
from mocker import MockerTestCase, ANY, ARGS, KWARGS
@ -61,14 +61,14 @@ class TestWalkerHandleHandler(MockerTestCase):
import_mock(self.expected_module_name)
self.mocker.result(self.module_fake)
self.mocker.replay()
handlers.walker_handle_handler(self.data, self.ctype, self.filename,
self.payload)
self.assertEqual(1, self.data["handlercount"])
def test_import_error(self):
"""Module import errors are logged. No handler added to C{pdata}"""
"""Module import errors are logged. No handler added to C{pdata}."""
import_mock = self.mocker.replace(importer.import_module,
passthrough=False)
import_mock(self.expected_module_name)
@ -81,7 +81,7 @@ class TestWalkerHandleHandler(MockerTestCase):
self.assertEqual(0, self.data["handlercount"])
def test_attribute_error(self):
"""Attribute errors are logged. No handler added to C{pdata}"""
"""Attribute errors are logged. No handler added to C{pdata}."""
import_mock = self.mocker.replace(importer.import_module,
passthrough=False)
import_mock(self.expected_module_name)
@ -156,7 +156,7 @@ class TestHandlerHandlePart(MockerTestCase):
self.payload, self.frequency)
def test_no_handle_when_modfreq_once(self):
"""C{handle_part} is not called if frequency is once"""
"""C{handle_part} is not called if frequency is once."""
self.frequency = "once"
mod_mock = self.mocker.mock()
getattr(mod_mock, "frequency")

View File

@ -1,4 +1,4 @@
"""Tests of the built-in user data handlers"""
"""Tests of the built-in user data handlers."""
import os
@ -33,7 +33,7 @@ class TestBuiltins(MockerTestCase):
None, None, None)
self.assertEquals(0, len(os.listdir(up_root)))
def test_upstart_frequency_single(self):
def test_upstart_frequency_single(self):
c_root = self.makeDir()
up_root = self.makeDir()
paths = helpers.Paths({

View File

@ -25,14 +25,15 @@ import os
import shutil
import tempfile
from unittest import TestCase
from cloudinit import helpers
from unittest import TestCase
# Get the cloudinit.sources.DataSourceAltCloud import items needed.
import cloudinit.sources.DataSourceAltCloud
from cloudinit.sources.DataSourceAltCloud import DataSourceAltCloud
from cloudinit.sources.DataSourceAltCloud import read_user_data_callback
def _write_cloud_info_file(value):
'''
Populate the CLOUD_INFO_FILE which would be populated
@ -44,12 +45,14 @@ def _write_cloud_info_file(value):
cifile.close()
os.chmod(cloudinit.sources.DataSourceAltCloud.CLOUD_INFO_FILE, 0664)
def _remove_cloud_info_file():
'''
Remove the test CLOUD_INFO_FILE
'''
os.remove(cloudinit.sources.DataSourceAltCloud.CLOUD_INFO_FILE)
def _write_user_data_files(mount_dir, value):
'''
Populate the deltacloud_user_data_file the user_data_file
@ -68,6 +71,7 @@ def _write_user_data_files(mount_dir, value):
udfile.close()
os.chmod(user_data_file, 0664)
def _remove_user_data_files(mount_dir,
dc_file=True,
non_dc_file=True):
@ -91,14 +95,15 @@ def _remove_user_data_files(mount_dir,
except OSError:
pass
class TestGetCloudType(TestCase):
'''
Test to exercise method: DataSourceAltCloud.get_cloud_type()
Test to exercise method: DataSourceAltCloud.get_cloud_type()
'''
def setUp(self):
''' Set up '''
self.paths = helpers.Paths({ 'cloud_dir': '/tmp' })
'''Set up.'''
self.paths = helpers.Paths({'cloud_dir': '/tmp'})
def tearDown(self):
# Reset
@ -158,14 +163,15 @@ class TestGetCloudType(TestCase):
self.assertEquals('UNKNOWN', \
dsrc.get_cloud_type())
class TestGetDataCloudInfoFile(TestCase):
'''
Test to exercise method: DataSourceAltCloud.get_data()
Test to exercise method: DataSourceAltCloud.get_data()
With a contrived CLOUD_INFO_FILE
'''
def setUp(self):
''' Set up '''
self.paths = helpers.Paths({ 'cloud_dir': '/tmp' })
'''Set up.'''
self.paths = helpers.Paths({'cloud_dir': '/tmp'})
self.cloud_info_file = tempfile.mkstemp()[1]
cloudinit.sources.DataSourceAltCloud.CLOUD_INFO_FILE = \
self.cloud_info_file
@ -183,52 +189,53 @@ class TestGetDataCloudInfoFile(TestCase):
'/etc/sysconfig/cloud-info'
def test_rhev(self):
'''Success Test module get_data() forcing RHEV '''
'''Success Test module get_data() forcing RHEV.'''
_write_cloud_info_file('RHEV')
dsrc = DataSourceAltCloud({}, None, self.paths)
dsrc.user_data_rhevm = lambda : True
dsrc.user_data_rhevm = lambda: True
self.assertEquals(True, dsrc.get_data())
def test_vsphere(self):
'''Success Test module get_data() forcing VSPHERE '''
'''Success Test module get_data() forcing VSPHERE.'''
_write_cloud_info_file('VSPHERE')
dsrc = DataSourceAltCloud({}, None, self.paths)
dsrc.user_data_vsphere = lambda : True
dsrc.user_data_vsphere = lambda: True
self.assertEquals(True, dsrc.get_data())
def test_fail_rhev(self):
'''Failure Test module get_data() forcing RHEV '''
'''Failure Test module get_data() forcing RHEV.'''
_write_cloud_info_file('RHEV')
dsrc = DataSourceAltCloud({}, None, self.paths)
dsrc.user_data_rhevm = lambda : False
dsrc.user_data_rhevm = lambda: False
self.assertEquals(False, dsrc.get_data())
def test_fail_vsphere(self):
'''Failure Test module get_data() forcing VSPHERE '''
'''Failure Test module get_data() forcing VSPHERE.'''
_write_cloud_info_file('VSPHERE')
dsrc = DataSourceAltCloud({}, None, self.paths)
dsrc.user_data_vsphere = lambda : False
dsrc.user_data_vsphere = lambda: False
self.assertEquals(False, dsrc.get_data())
def test_unrecognized(self):
'''Failure Test module get_data() forcing unrecognized '''
'''Failure Test module get_data() forcing unrecognized.'''
_write_cloud_info_file('unrecognized')
dsrc = DataSourceAltCloud({}, None, self.paths)
self.assertEquals(False, dsrc.get_data())
class TestGetDataNoCloudInfoFile(TestCase):
'''
Test to exercise method: DataSourceAltCloud.get_data()
Test to exercise method: DataSourceAltCloud.get_data()
Without a CLOUD_INFO_FILE
'''
def setUp(self):
''' Set up '''
self.paths = helpers.Paths({ 'cloud_dir': '/tmp' })
'''Set up.'''
self.paths = helpers.Paths({'cloud_dir': '/tmp'})
cloudinit.sources.DataSourceAltCloud.CLOUD_INFO_FILE = \
'no such file'
@ -240,38 +247,39 @@ class TestGetDataNoCloudInfoFile(TestCase):
['dmidecode', '--string', 'system-product-name']
def test_rhev_no_cloud_file(self):
'''Test No cloud info file module get_data() forcing RHEV '''
'''Test No cloud info file module get_data() forcing RHEV.'''
cloudinit.sources.DataSourceAltCloud.CMD_DMI_SYSTEM = \
['echo', 'RHEV Hypervisor']
dsrc = DataSourceAltCloud({}, None, self.paths)
dsrc.user_data_rhevm = lambda : True
dsrc.user_data_rhevm = lambda: True
self.assertEquals(True, dsrc.get_data())
def test_vsphere_no_cloud_file(self):
'''Test No cloud info file module get_data() forcing VSPHERE '''
'''Test No cloud info file module get_data() forcing VSPHERE.'''
cloudinit.sources.DataSourceAltCloud.CMD_DMI_SYSTEM = \
['echo', 'VMware Virtual Platform']
dsrc = DataSourceAltCloud({}, None, self.paths)
dsrc.user_data_vsphere = lambda : True
dsrc.user_data_vsphere = lambda: True
self.assertEquals(True, dsrc.get_data())
def test_failure_no_cloud_file(self):
'''Test No cloud info file module get_data() forcing unrecognized '''
'''Test No cloud info file module get_data() forcing unrecognized.'''
cloudinit.sources.DataSourceAltCloud.CMD_DMI_SYSTEM = \
['echo', 'Unrecognized Platform']
dsrc = DataSourceAltCloud({}, None, self.paths)
self.assertEquals(False, dsrc.get_data())
class TestUserDataRhevm(TestCase):
'''
Test to exercise method: DataSourceAltCloud.user_data_rhevm()
Test to exercise method: DataSourceAltCloud.user_data_rhevm()
'''
def setUp(self):
''' Set up '''
self.paths = helpers.Paths({ 'cloud_dir': '/tmp' })
'''Set up.'''
self.paths = helpers.Paths({'cloud_dir': '/tmp'})
self.mount_dir = tempfile.mkdtemp()
_write_user_data_files(self.mount_dir, 'test user data')
@ -295,7 +303,7 @@ class TestUserDataRhevm(TestCase):
['/sbin/udevadm', 'settle', '--quiet', '--timeout=5']
def test_mount_cb_fails(self):
'''Test user_data_rhevm() where mount_cb fails'''
'''Test user_data_rhevm() where mount_cb fails.'''
cloudinit.sources.DataSourceAltCloud.CMD_PROBE_FLOPPY = \
['echo', 'modprobe floppy']
@ -305,7 +313,7 @@ class TestUserDataRhevm(TestCase):
self.assertEquals(False, dsrc.user_data_rhevm())
def test_modprobe_fails(self):
'''Test user_data_rhevm() where modprobe fails. '''
'''Test user_data_rhevm() where modprobe fails.'''
cloudinit.sources.DataSourceAltCloud.CMD_PROBE_FLOPPY = \
['ls', 'modprobe floppy']
@ -315,7 +323,7 @@ class TestUserDataRhevm(TestCase):
self.assertEquals(False, dsrc.user_data_rhevm())
def test_no_modprobe_cmd(self):
'''Test user_data_rhevm() with no modprobe command. '''
'''Test user_data_rhevm() with no modprobe command.'''
cloudinit.sources.DataSourceAltCloud.CMD_PROBE_FLOPPY = \
['bad command', 'modprobe floppy']
@ -325,7 +333,7 @@ class TestUserDataRhevm(TestCase):
self.assertEquals(False, dsrc.user_data_rhevm())
def test_udevadm_fails(self):
'''Test user_data_rhevm() where udevadm fails. '''
'''Test user_data_rhevm() where udevadm fails.'''
cloudinit.sources.DataSourceAltCloud.CMD_UDEVADM_SETTLE = \
['ls', 'udevadm floppy']
@ -335,7 +343,7 @@ class TestUserDataRhevm(TestCase):
self.assertEquals(False, dsrc.user_data_rhevm())
def test_no_udevadm_cmd(self):
'''Test user_data_rhevm() with no udevadm command. '''
'''Test user_data_rhevm() with no udevadm command.'''
cloudinit.sources.DataSourceAltCloud.CMD_UDEVADM_SETTLE = \
['bad command', 'udevadm floppy']
@ -344,13 +352,14 @@ class TestUserDataRhevm(TestCase):
self.assertEquals(False, dsrc.user_data_rhevm())
class TestUserDataVsphere(TestCase):
'''
Test to exercise method: DataSourceAltCloud.user_data_vsphere()
Test to exercise method: DataSourceAltCloud.user_data_vsphere()
'''
def setUp(self):
''' Set up '''
self.paths = helpers.Paths({ 'cloud_dir': '/tmp' })
'''Set up.'''
self.paths = helpers.Paths({'cloud_dir': '/tmp'})
self.mount_dir = tempfile.mkdtemp()
_write_user_data_files(self.mount_dir, 'test user data')
@ -370,7 +379,7 @@ class TestUserDataVsphere(TestCase):
'/etc/sysconfig/cloud-info'
def test_user_data_vsphere(self):
'''Test user_data_vsphere() where mount_cb fails'''
'''Test user_data_vsphere() where mount_cb fails.'''
cloudinit.sources.DataSourceAltCloud.MEDIA_DIR = self.mount_dir
@ -378,13 +387,14 @@ class TestUserDataVsphere(TestCase):
self.assertEquals(False, dsrc.user_data_vsphere())
class TestReadUserDataCallback(TestCase):
'''
Test to exercise method: DataSourceAltCloud.read_user_data_callback()
Test to exercise method: DataSourceAltCloud.read_user_data_callback()
'''
def setUp(self):
''' Set up '''
self.paths = helpers.Paths({ 'cloud_dir': '/tmp' })
'''Set up.'''
self.paths = helpers.Paths({'cloud_dir': '/tmp'})
self.mount_dir = tempfile.mkdtemp()
_write_user_data_files(self.mount_dir, 'test user data')
@ -400,15 +410,14 @@ class TestReadUserDataCallback(TestCase):
except OSError:
pass
def test_callback_both(self):
'''Test read_user_data_callback() with both files'''
'''Test read_user_data_callback() with both files.'''
self.assertEquals('test user data',
read_user_data_callback(self.mount_dir))
def test_callback_dc(self):
'''Test read_user_data_callback() with only DC file'''
'''Test read_user_data_callback() with only DC file.'''
_remove_user_data_files(self.mount_dir,
dc_file=False,
@ -418,7 +427,7 @@ class TestReadUserDataCallback(TestCase):
read_user_data_callback(self.mount_dir))
def test_callback_non_dc(self):
'''Test read_user_data_callback() with only non-DC file'''
'''Test read_user_data_callback() with only non-DC file.'''
_remove_user_data_files(self.mount_dir,
dc_file=True,
@ -428,9 +437,9 @@ class TestReadUserDataCallback(TestCase):
read_user_data_callback(self.mount_dir))
def test_callback_none(self):
'''Test read_user_data_callback() no files are found'''
'''Test read_user_data_callback() no files are found.'''
_remove_user_data_files(self.mount_dir)
_remove_user_data_files(self.mount_dir)
self.assertEquals(None, read_user_data_callback(self.mount_dir))
# vi: ts=4 expandtab

View File

@ -1,8 +1,8 @@
import os
from copy import copy
import os
from cloudinit import url_helper
from cloudinit.sources import DataSourceMAAS
from cloudinit import url_helper
from mocker import MockerTestCase
@ -15,7 +15,7 @@ class TestMAASDataSource(MockerTestCase):
self.tmp = self.makeDir()
def test_seed_dir_valid(self):
"""Verify a valid seeddir is read as such"""
"""Verify a valid seeddir is read as such."""
data = {'instance-id': 'i-valid01',
'local-hostname': 'valid01-hostname',
@ -35,7 +35,7 @@ class TestMAASDataSource(MockerTestCase):
self.assertFalse(('user-data' in metadata))
def test_seed_dir_valid_extra(self):
"""Verify extra files do not affect seed_dir validity """
"""Verify extra files do not affect seed_dir validity."""
data = {'instance-id': 'i-valid-extra',
'local-hostname': 'valid-extra-hostname',
@ -54,7 +54,7 @@ class TestMAASDataSource(MockerTestCase):
self.assertFalse(('foo' in metadata))
def test_seed_dir_invalid(self):
"""Verify that invalid seed_dir raises MAASSeedDirMalformed"""
"""Verify that invalid seed_dir raises MAASSeedDirMalformed."""
valid = {'instance-id': 'i-instanceid',
'local-hostname': 'test-hostname', 'user-data': ''}
@ -78,20 +78,20 @@ class TestMAASDataSource(MockerTestCase):
DataSourceMAAS.read_maas_seed_dir, my_d)
def test_seed_dir_none(self):
"""Verify that empty seed_dir raises MAASSeedDirNone"""
"""Verify that empty seed_dir raises MAASSeedDirNone."""
my_d = os.path.join(self.tmp, "valid_empty")
self.assertRaises(DataSourceMAAS.MAASSeedDirNone,
DataSourceMAAS.read_maas_seed_dir, my_d)
def test_seed_dir_missing(self):
"""Verify that missing seed_dir raises MAASSeedDirNone"""
self.assertRaises(DataSourceMAAS.MAASSeedDirNone,
"""Verify that missing seed_dir raises MAASSeedDirNone."""
self.assertRaises(DataSourceMAAS.MAASSeedDirNone,
DataSourceMAAS.read_maas_seed_dir,
os.path.join(self.tmp, "nonexistantdirectory"))
def test_seed_url_valid(self):
"""Verify that valid seed_url is read as such"""
"""Verify that valid seed_url is read as such."""
valid = {'meta-data/instance-id': 'i-instanceid',
'meta-data/local-hostname': 'test-hostname',
'meta-data/public-keys': 'test-hostname',
@ -129,11 +129,11 @@ class TestMAASDataSource(MockerTestCase):
valid['meta-data/local-hostname'])
def test_seed_url_invalid(self):
"""Verify that invalid seed_url raises MAASSeedDirMalformed"""
"""Verify that invalid seed_url raises MAASSeedDirMalformed."""
pass
def test_seed_url_missing(self):
"""Verify seed_url with no found entries raises MAASSeedDirNone"""
"""Verify seed_url with no found entries raises MAASSeedDirNone."""
pass

View File

@ -1,8 +1,8 @@
from mocker import MockerTestCase
from cloudinit import util
from cloudinit import cloud
from cloudinit import helpers
from cloudinit import util
from cloudinit.config import cc_ca_certs
@ -64,7 +64,7 @@ class TestConfig(MockerTestCase):
cc_ca_certs.handle(self.name, config, self.cloud, self.log, self.args)
def test_empty_trusted_list(self):
"""Test that no certificate are written if 'trusted' list is empty"""
"""Test that no certificate are written if 'trusted' list is empty."""
config = {"ca-certs": {"trusted": []}}
# No functions should be called
@ -74,7 +74,7 @@ class TestConfig(MockerTestCase):
cc_ca_certs.handle(self.name, config, self.cloud, self.log, self.args)
def test_single_trusted(self):
"""Test that a single cert gets passed to add_ca_certs"""
"""Test that a single cert gets passed to add_ca_certs."""
config = {"ca-certs": {"trusted": ["CERT1"]}}
self.mock_add(self.paths, ["CERT1"])
@ -84,7 +84,7 @@ class TestConfig(MockerTestCase):
cc_ca_certs.handle(self.name, config, self.cloud, self.log, self.args)
def test_multiple_trusted(self):
"""Test that multiple certs get passed to add_ca_certs"""
"""Test that multiple certs get passed to add_ca_certs."""
config = {"ca-certs": {"trusted": ["CERT1", "CERT2"]}}
self.mock_add(self.paths, ["CERT1", "CERT2"])
@ -94,7 +94,7 @@ class TestConfig(MockerTestCase):
cc_ca_certs.handle(self.name, config, self.cloud, self.log, self.args)
def test_remove_default_ca_certs(self):
"""Test remove_defaults works as expected"""
"""Test remove_defaults works as expected."""
config = {"ca-certs": {"remove-defaults": True}}
self.mock_remove(self.paths)
@ -104,7 +104,7 @@ class TestConfig(MockerTestCase):
cc_ca_certs.handle(self.name, config, self.cloud, self.log, self.args)
def test_no_remove_defaults_if_false(self):
"""Test remove_defaults is not called when config value is False"""
"""Test remove_defaults is not called when config value is False."""
config = {"ca-certs": {"remove-defaults": False}}
self.mock_update()
@ -113,7 +113,7 @@ class TestConfig(MockerTestCase):
cc_ca_certs.handle(self.name, config, self.cloud, self.log, self.args)
def test_correct_order_for_remove_then_add(self):
"""Test remove_defaults is not called when config value is False"""
"""Test remove_defaults is not called when config value is False."""
config = {"ca-certs": {"remove-defaults": True, "trusted": ["CERT1"]}}
self.mock_remove(self.paths)
@ -139,7 +139,7 @@ class TestAddCaCerts(MockerTestCase):
cc_ca_certs.add_ca_certs(self.paths, [])
def test_single_cert(self):
"""Test adding a single certificate to the trusted CAs"""
"""Test adding a single certificate to the trusted CAs."""
cert = "CERT1\nLINE2\nLINE3"
mock_write = self.mocker.replace(util.write_file, passthrough=False)
@ -152,7 +152,7 @@ class TestAddCaCerts(MockerTestCase):
cc_ca_certs.add_ca_certs(self.paths, [cert])
def test_multiple_certs(self):
"""Test adding multiple certificates to the trusted CAs"""
"""Test adding multiple certificates to the trusted CAs."""
certs = ["CERT1\nLINE2\nLINE3", "CERT2\nLINE2\nLINE3"]
expected_cert_file = "\n".join(certs)

View File

@ -1,4 +1,4 @@
"""Tests for handling of userdata within cloud init"""
"""Tests for handling of userdata within cloud init."""
import StringIO
@ -54,7 +54,7 @@ class TestConsumeUserData(MockerTestCase):
return log_file
def test_unhandled_type_warning(self):
"""Raw text without magic is ignored but shows warning"""
"""Raw text without magic is ignored but shows warning."""
ci = stages.Init()
data = "arbitrary text\n"
ci.datasource = FakeDataSource(data)
@ -70,7 +70,7 @@ class TestConsumeUserData(MockerTestCase):
log_file.getvalue())
def test_mime_text_plain(self):
"""Mime message of type text/plain is ignored but shows warning"""
"""Mime message of type text/plain is ignored but shows warning."""
ci = stages.Init()
message = MIMEBase("text", "plain")
message.set_payload("Just text")
@ -86,9 +86,8 @@ class TestConsumeUserData(MockerTestCase):
"Unhandled unknown content-type (text/plain)",
log_file.getvalue())
def test_shellscript(self):
"""Raw text starting #!/bin/sh is treated as script"""
"""Raw text starting #!/bin/sh is treated as script."""
ci = stages.Init()
script = "#!/bin/sh\necho hello\n"
ci.datasource = FakeDataSource(script)
@ -104,7 +103,7 @@ class TestConsumeUserData(MockerTestCase):
self.assertEqual("", log_file.getvalue())
def test_mime_text_x_shellscript(self):
"""Mime message of type text/x-shellscript is treated as script"""
"""Mime message of type text/x-shellscript is treated as script."""
ci = stages.Init()
script = "#!/bin/sh\necho hello\n"
message = MIMEBase("text", "x-shellscript")
@ -122,7 +121,7 @@ class TestConsumeUserData(MockerTestCase):
self.assertEqual("", log_file.getvalue())
def test_mime_text_plain_shell(self):
"""Mime type text/plain starting #!/bin/sh is treated as script"""
"""Mime type text/plain starting #!/bin/sh is treated as script."""
ci = stages.Init()
script = "#!/bin/sh\necho hello\n"
message = MIMEBase("text", "plain")

View File

@ -1,11 +1,11 @@
import os
import stat
from unittest import TestCase
from mocker import MockerTestCase
from unittest import TestCase
from cloudinit import util
from cloudinit import importer
from cloudinit import util
class FakeSelinux(object):

View File

@ -100,7 +100,7 @@ def cloud_todo_format(physical_line):
"""
pos = physical_line.find('TODO')
pos1 = physical_line.find('TODO(')
pos2 = physical_line.find('#') # make sure it's a comment
pos2 = physical_line.find('#') # make sure it's a comment
if (pos != pos1 and pos2 >= 0 and pos2 < pos):
return pos, "N101: Use TODO(NAME)"
@ -133,7 +133,6 @@ def cloud_docstring_multiline_end(physical_line):
return (pos, "N403: multi line docstring end on new line")
current_file = ""
@ -169,4 +168,3 @@ if __name__ == "__main__":
if len(_missingImport) > 0:
print >> sys.stderr, ("%i imports missing in this test environment"
% len(_missingImport))

View File

@ -6,7 +6,7 @@
"""
To use this to mimic the EC2 metadata service entirely, run it like:
# Where 'eth0' is *some* interface.
# Where 'eth0' is *some* interface.
sudo ifconfig eth0:0 169.254.169.254 netmask 255.255.255.255
sudo ./mock-meta.py -a 169.254.169.254 -p 80
@ -23,7 +23,7 @@ import json
import logging
import os
import random
import string # pylint: disable=W0402
import string # pylint: disable=W0402
import sys
import yaml
@ -156,6 +156,8 @@ def traverse(keys, mp):
ID_CHARS = [c for c in (string.ascii_uppercase + string.digits)]
def id_generator(size=6, lower=False):
txt = ''.join(random.choice(ID_CHARS) for x in range(size))
if lower:
@ -235,11 +237,11 @@ class MetaDataHandler(object):
nparams = params[1:]
# This is a weird kludge, why amazon why!!!
# public-keys is messed up, list of /latest/meta-data/public-keys/
# shows something like: '0=brickies'
# but a GET to /latest/meta-data/public-keys/0=brickies will fail
# you have to know to get '/latest/meta-data/public-keys/0', then
# from there you get a 'openssh-key', which you can get.
# this hunk of code just re-works the object for that.
# shows something like: '0=brickies'
# but a GET to /latest/meta-data/public-keys/0=brickies will fail
# you have to know to get '/latest/meta-data/public-keys/0', then
# from there you get a 'openssh-key', which you can get.
# this hunk of code just re-works the object for that.
avail_keys = get_ssh_keys()
key_ids = sorted(list(avail_keys.keys()))
if nparams:
@ -255,7 +257,7 @@ class MetaDataHandler(object):
"openssh-key": "\n".join(avail_keys[key_name]),
})
if isinstance(result, (dict)):
# TODO: This might not be right??
# TODO(harlowja): This might not be right??
result = "\n".join(sorted(result.keys()))
if not result:
result = ''
@ -304,13 +306,13 @@ class UserDataHandler(object):
blob = "\n".join(lines)
return blob.strip()
def get_data(self, params, who, **kwargs): # pylint: disable=W0613
def get_data(self, params, who, **kwargs): # pylint: disable=W0613
if not params:
return self._get_user_blob(who=who)
return NOT_IMPL_RESPONSE
# Seem to need to use globals since can't pass
# Seem to need to use globals since can't pass
# data into the request handlers instances...
# Puke!
meta_fetcher = None
@ -432,7 +434,7 @@ def setup_fetchers(opts):
def run_server():
# Using global here since it doesn't seem like we
# Using global here since it doesn't seem like we
# can pass opts into a request handler constructor...
opts = extract_opts()
setup_logging(logging.DEBUG)