packstack/packstack/installer/validators.py

264 lines
7.7 KiB
Python

# -*- coding: utf-8 -*-
"""
Contains all core validation functions.
"""
import os
import re
import socket
import logging
from . import utils
from .exceptions import ParamValidationError
__all__ = ('ParamValidationError', 'validate_integer', 'validate_float',
'validate_regexp', 'validate_port', 'validate_not_empty',
'validate_options', 'validate_multi_options', 'validate_ip',
'validate_multi_ip', 'validate_file', 'validate_ping',
'validate_multi_ping', 'validate_ssh', 'validate_multi_ssh',
'validate_sshkey')
def validate_integer(param, options=None):
"""
Raises ParamValidationError if given param is not integer.
"""
options = options or []
try:
int(param)
except ValueError:
logging.debug('validate_integer(%s, options=%s) failed.' %
(param, options))
msg = 'Given value is not an integer: %s'
raise ParamValidationError(msg % param)
def validate_float(param, options=None):
"""
Raises ParamValidationError if given param is not a float.
"""
options = options or []
try:
float(param)
except ValueError:
logging.debug('validate_float(%s, options=%s) failed.' %
(param, options))
msg = 'Given value is not a float: %s'
raise ParamValidationError(msg % param)
def validate_regexp(param, options=None):
"""
Raises ParamValidationError if given param doesn't match at least
one of regular expressions given in options.
"""
options = options or []
for regex in options:
if re.search(regex, param):
break
else:
logging.debug('validate_regexp(%s, options=%s) failed.' %
(param, options))
msg = 'Given value does not match required regular expression: %s'
raise ParamValidationError(msg % param)
def validate_multi_regexp(param, options=None):
"""
Raises ParamValidationError if any of the comma separated values given
in param doesn't match one of the regular expressions given in options.
"""
options = options or []
for i in param.split(','):
validate_regexp(i.strip(), options=options)
def validate_port(param, options=None):
"""
Raises ParamValidationError if given param is not a decimal number
in range (0, 65535).
"""
options = options or []
validate_integer(param, options)
port = int(param)
if not (port >= 0 and port < 65535):
logging.debug('validate_port(%s, options=%s) failed.' %
(param, options))
msg = 'Given value is outside the range of (0, 65535): %s'
raise ParamValidationError(msg % param)
def validate_not_empty(param, options=None):
"""
Raises ParamValidationError if given param is empty.
"""
options = options or []
if not param and param is not False:
logging.debug('validate_not_empty(%s, options=%s) failed.' %
(param, options))
msg = 'Given value is not allowed: %s'
raise ParamValidationError(msg % param)
def validate_options(param, options=None):
"""
Raises ParamValidationError if given param is not member of options.
"""
options = options or []
# TO-DO: to be more flexible, remove this and exit in case param is empty
validate_not_empty(param, options)
if param not in options:
logging.debug('validate_options(%s, options=%s) failed.' %
(param, options))
msg = 'Given value is not member of allowed values %s: %s'
raise ParamValidationError(msg % (options, param))
def validate_multi_options(param, options=None):
"""
Validates if comma separated values given in params are members
of options.
"""
if not param:
return
options = options or []
for i in param.split(','):
validate_options(i.strip(), options=options)
def validate_ip(param, options=None):
"""
Raises ParamValidationError if given parameter value is not in IPv4
or IPv6 address.
"""
for family in (socket.AF_INET, socket.AF_INET6):
try:
socket.inet_pton(family, param)
break
except socket.error:
continue
else:
logging.debug('validate_ip(%s, options=%s) failed.' %
(param, options))
msg = 'Given host is not in IP address format: %s'
raise ParamValidationError(msg % param)
def validate_multi_ip(param, options=None):
"""
Raises ParamValidationError if comma separated IP addresses given
parameter value are in IPv4 or IPv6 aformat.
"""
for host in param.split(','):
host = host.split('/', 1)[0]
validate_ip(host.strip(), options)
def validate_file(param, options=None):
"""
Raises ParamValidationError if provided file in param does not exist.
"""
options = options or []
# TO-DO: to be more flexible, remove this and exit in case param is empty
validate_not_empty(param)
if not os.path.isfile(param):
logging.debug('validate_file(%s, options=%s) failed.' %
(param, options))
msg = 'Given file does not exist: %s'
raise ParamValidationError(msg % param)
def validate_ping(param, options=None):
"""
Raises ParamValidationError if provided host does not answer to ICMP
echo request.
"""
options = options or []
# TO-DO: to be more flexible, remove this and exit in case param is empty
validate_not_empty(param)
rc, out = utils.execute(['/bin/ping', '-c', '1', str(param)],
can_fail=False)
if rc != 0:
logging.debug('validate_ping(%s, options=%s) failed.' %
(param, options))
msg = 'Given host is unreachable: %s'
raise ParamValidationError(msg % param)
def validate_multi_ping(param, options=None):
"""
Raises ParamValidationError if comma separated host given in param
do not answer to ICMP echo request.
"""
options = options or []
# TO-DO: to be more flexible, remove this and exit in case param is empty
validate_not_empty(param)
for host in param.split(","):
validate_ping(host.strip())
_tested_ports = []
def touch_port(host, port):
"""
Check that provided host is listening on provided port.
"""
key = "%s:%d" % (host, port)
if key in _tested_ports:
return
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.connect((host, port))
s.shutdown(socket.SHUT_RDWR)
s.close()
_tested_ports.append(key)
def validate_ssh(param, options=None):
"""
Raises ParamValidationError if provided host does not listen
on port 22.
"""
options = options or []
try:
touch_port(param.strip(), 22)
except socket.error:
logging.debug('validate_ssh(%s, options=%s) failed.' %
(param, options))
msg = 'Given host does not listen on port 22: %s'
raise ParamValidationError(msg % param)
def validate_multi_ssh(param, options=None):
"""
Raises ParamValidationError if comma separated host provided
in param do not listen on port 22.
"""
options = options or []
for host in param.split(","):
validate_ssh(host)
def validate_sshkey(param, options=None):
"""
Raises ParamValidationError if provided sshkey file is not public key.
"""
if not param:
return
with open(param) as sshkey:
line = sshkey.readline()
msg = None
if not re.search('ssh-|ecdsa', line):
msg = ('Invalid content header in %s, Public SSH key is required.'
% param)
if re.search('BEGIN [RD]SA PRIVATE KEY', line):
msg = 'Public SSH key is required. You passed private key.'
if msg:
raise ParamValidationError(msg)