Merge "Support multiple detection types in health policy"

This commit is contained in:
Zuul 2018-10-09 23:17:19 +00:00 committed by Gerrit Code Review
commit 5e4baf5ae0
9 changed files with 1699 additions and 865 deletions

View File

@ -288,6 +288,12 @@ RECOVERY_ACTIONS = (
'REBOOT', 'REBUILD', 'RECREATE',
)
RECOVERY_CONDITIONAL = (
ALL_FAILED, ANY_FAILED,
) = (
'ALL_FAILED', 'ANY_FAILED',
)
NOTIFICATION_PRIORITIES = (
PRIO_AUDIT, PRIO_CRITICAL, PRIO_ERROR, PRIO_WARN, PRIO_INFO, PRIO_DEBUG,
PRIO_SAMPLE,

View File

@ -88,7 +88,7 @@ def level_from_number(value):
return levels.get(n, None)
def url_fetch(url, allowed_schemes=('http', 'https'), verify=True):
def url_fetch(url, timeout=1, allowed_schemes=('http', 'https'), verify=True):
"""Get the data at the specified URL.
The URL must use the http: or https: schemes.
@ -96,7 +96,6 @@ def url_fetch(url, allowed_schemes=('http', 'https'), verify=True):
the allowed_schemes argument.
Raise an IOError if getting the data fails.
"""
LOG.info('Fetching data from %s', url)
components = urllib.parse.urlparse(url)
@ -105,12 +104,12 @@ def url_fetch(url, allowed_schemes=('http', 'https'), verify=True):
if components.scheme == 'file':
try:
return urllib.request.urlopen(url).read()
return urllib.request.urlopen(url, timeout=timeout).read()
except urllib.error.URLError as uex:
raise URLFetchError(_('Failed to retrieve data: %s') % uex)
try:
resp = requests.get(url, stream=True, verify=verify)
resp = requests.get(url, stream=True, verify=verify, timeout=timeout)
resp.raise_for_status()
# We cannot use resp.text here because it would download the entire

View File

@ -17,6 +17,8 @@ trigger corresponding actions to recover the clusters based on the pre-defined
health policies.
"""
from collections import defaultdict
from collections import namedtuple
from oslo_config import cfg
from oslo_log import log as logging
import oslo_messaging as messaging
@ -24,13 +26,13 @@ from oslo_service import service
from oslo_service import threadgroup
from oslo_utils import timeutils
import re
import six
import time
from senlin.common import consts
from senlin.common import context
from senlin.common import messaging as rpc
from senlin.common import utils
from senlin.engine import node as node_mod
from senlin import objects
from senlin.rpc import client as rpc_client
@ -195,94 +197,91 @@ def ListenerProc(exchange, project_id, cluster_id, recover_action):
listener.start()
class HealthManager(service.Service):
class HealthCheckType(object):
@staticmethod
def factory(detection_type, cid, interval, params):
node_update_timeout = params['node_update_timeout']
detection_params = [
p for p in params['detection_modes']
if p['type'] == detection_type
]
if len(detection_params) != 1:
raise Exception(
'The same detection mode cannot be used more than once in the '
'same policy. Encountered {} instances of '
'type {}.'.format(len(detection_params), detection_type)
)
def __init__(self, engine_service, topic, version):
super(HealthManager, self).__init__()
self.TG = threadgroup.ThreadGroup()
self.engine_id = engine_service.engine_id
self.topic = topic
self.version = version
self.ctx = context.get_admin_context()
self.rpc_client = rpc_client.EngineClient()
self.rt = {
'registries': [],
}
def _dummy_task(self):
"""A Dummy task that is queued on the health manager thread group.
The task is here so that the service always has something to wait()
on, or else the process will exit.
"""
self._load_runtime_registry()
def _wait_for_action(self, ctx, action_id, timeout):
done = False
req = objects.ActionGetRequest(identity=action_id)
with timeutils.StopWatch(timeout) as timeout_watch:
while timeout > 0:
action = self.rpc_client.call(ctx, 'action_get', req)
if action['status'] in [consts.ACTION_SUCCEEDED,
consts.ACTION_FAILED,
consts.ACTION_CANCELLED]:
if action['status'] == consts.ACTION_SUCCEEDED:
done = True
break
time.sleep(2)
timeout = timeout_watch.leftover(True)
if done:
return True, ""
elif timeout <= 0:
return False, "Timeout while polling cluster status"
if detection_type == consts.NODE_STATUS_POLLING:
return NodePollStatusHealthCheck(
cid, interval, node_update_timeout, detection_params[0])
elif detection_type == consts.NODE_STATUS_POLL_URL:
return NodePollUrlHealthCheck(
cid, interval, node_update_timeout, detection_params[0])
else:
return False, "Cluster check action failed or cancelled"
raise Exception(
'Invalid detection type: {}'.format(detection_type))
def _poll_cluster(self, cluster_id, timeout, recover_action):
"""Routine to be executed for polling cluster status.
def __init__(self, cluster_id, interval, node_update_timeout, params):
"""Initialize HealthCheckType
:param ctx:
:param cluster_id: The UUID of the cluster to be checked.
:param timeout: The maximum number of seconds to wait.
:param recover_action: The health policy action name.
:returns: Nothing.
:param params: Parameters specific to poll url or recovery action.
"""
self.cluster_id = cluster_id
self.interval = interval
self.node_update_timeout = node_update_timeout
self.params = params
def run_health_check(self, ctx, node):
"""Run health check on node
:returns: True if node is healthy. False otherwise.
"""
pass
class NodePollStatusHealthCheck(HealthCheckType):
def run_health_check(self, ctx, node):
"""Routine to be executed for polling node status.
:returns: True if node is healthy. False otherwise.
"""
start_time = timeutils.utcnow(True)
cluster = objects.Cluster.get(self.ctx, cluster_id, project_safe=False)
if not cluster:
LOG.warning("Cluster (%s) is not found.", cluster_id)
return _chase_up(start_time, timeout)
ctx = context.get_service_context(user_id=cluster.user,
project_id=cluster.project)
params = {'delete_check_action': True}
try:
req = objects.ClusterCheckRequest(identity=cluster_id,
params=params)
action = self.rpc_client.call(ctx, 'cluster_check', req)
# create engine node from db node
entity = node_mod.Node._from_object(ctx, node)
if not entity.do_check(ctx, return_check_result=True):
# server was not found as a result of performing check
node_last_updated = node.updated_at or node.init_at
if not timeutils.is_older_than(
node_last_updated, self.node_update_timeout):
LOG.info("Node %s was updated at %s which is less "
"than %d secs ago. Skip node recovery from "
"NodePollStatusHealthCheck.",
node.id, node_last_updated,
self.node_update_timeout)
return True
else:
return False
else:
LOG.debug("NodePollStatusHealthCheck reports node %s is "
"healthy.", node.id)
return True
except Exception as ex:
LOG.warning("Failed in triggering 'cluster_check' RPC for "
"'%(c)s': %(r)s",
{'c': cluster_id, 'r': six.text_type(ex)})
return _chase_up(start_time, timeout)
LOG.warning(
'Error when performing health check on node %s: %s',
node.id, ex
)
return False
# wait for action to complete
res, reason = self._wait_for_action(ctx, action['action'], timeout)
if not res:
LOG.warning("%s", reason)
return _chase_up(start_time, timeout)
# loop through nodes to trigger recovery
nodes = objects.Node.get_all_by_cluster(ctx, cluster_id)
for node in nodes:
if node.status != consts.NS_ACTIVE:
LOG.info("Requesting node recovery: %s", node.id)
req = objects.NodeRecoverRequest(identity=node.id,
params=recover_action)
self.rpc_client.call(ctx, 'node_recover', req)
return _chase_up(start_time, timeout)
class NodePollUrlHealthCheck(HealthCheckType):
@staticmethod
def _convert_detection_tuple(dictionary):
return namedtuple('DetectionMode', dictionary.keys())(**dictionary)
def _expand_url_template(self, url_template, node):
"""Expands parameters in an URL template
@ -300,31 +299,29 @@ class HealthManager(service.Service):
return url
def _check_url_and_recover_node(self, ctx, node, recover_action, params):
def run_health_check(self, ctx, node):
"""Routine to check a node status from a url and recovery if necessary
:param ctx: The request context to use for recovery action
:param node: The node to be checked.
:param recover_action: The health policy action name.
:param params: Parameters specific to poll url or recovery action
:returns: action if node was triggered for recovery. Otherwise None.
:returns: True if node is considered to be healthy. False otherwise.
"""
url_template = params['poll_url']
verify_ssl = params['poll_url_ssl_verify']
conn_error_as_unhealthy = params['poll_url_conn_error_as_unhealthy']
expected_resp_str = params['poll_url_healthy_response']
max_unhealthy_retry = params['poll_url_retry_limit']
retry_interval = params['poll_url_retry_interval']
node_update_timeout = params['node_update_timeout']
url_template = self.params['poll_url']
verify_ssl = self.params['poll_url_ssl_verify']
conn_error_as_unhealthy = self.params[
'poll_url_conn_error_as_unhealthy']
expected_resp_str = self.params['poll_url_healthy_response']
max_unhealthy_retry = self.params['poll_url_retry_limit']
retry_interval = self.params['poll_url_retry_interval']
def stop_node_recovery():
node_last_updated = node.updated_at or node.init_at
if not timeutils.is_older_than(
node_last_updated, node_update_timeout):
node_last_updated, self.node_update_timeout):
LOG.info("Node %s was updated at %s which is less than "
"%d secs ago. Skip node recovery.",
node.id, node_last_updated, node_update_timeout)
"%d secs ago. Skip node recovery from "
"NodePollUrlHealthCheck.",
node.id, node_last_updated, self.node_update_timeout)
return True
LOG.info("Node %s is reported as down (%d retries left)",
@ -334,84 +331,71 @@ class HealthManager(service.Service):
return False
url = self._expand_url_template(url_template, node)
LOG.info("Polling node status from URL: %s", url)
LOG.debug("Polling node status from URL: %s", url)
available_attemps = max_unhealthy_retry
timeout = max(retry_interval * 0.1, 1)
while available_attemps > 0:
available_attemps -= 1
try:
result = utils.url_fetch(url, verify=verify_ssl)
result = utils.url_fetch(
url, timeout=timeout, verify=verify_ssl)
except utils.URLFetchError as ex:
if conn_error_as_unhealthy:
if stop_node_recovery():
return None
return True
continue
else:
LOG.error("Error when requesting node health status from"
" %s: %s", url, ex)
return None
return True
LOG.debug("Node status returned from URL(%s): %s", url,
result)
if re.search(expected_resp_str, result):
LOG.debug('Node %s is healthy', node.id)
return None
LOG.debug('NodePollUrlHealthCheck reports node %s is healthy.',
node.id)
return True
if node.status != consts.NS_ACTIVE:
LOG.info("Skip node recovery because node %s is not in "
"ACTIVE state", node.id)
return None
"ACTIVE state.", node.id)
return True
if stop_node_recovery():
return None
return True
# recover node after exhausting retries
LOG.info("Requesting node recovery: %s", node.id)
req = objects.NodeRecoverRequest(identity=node.id,
params=recover_action)
return False
return self.rpc_client.call(ctx, 'node_recover', req)
def _poll_url(self, cluster_id, timeout, recover_action, params):
"""Routine to be executed for polling node status from a url
class HealthManager(service.Service):
:param cluster_id: The UUID of the cluster to be checked.
:param timeout: The maximum number of seconds to wait for recovery
action
:param recover_action: The health policy action name.
:param params: Parameters specific to poll url or recovery action
:returns: Nothing.
def __init__(self, engine_service, topic, version):
super(HealthManager, self).__init__()
self.TG = threadgroup.ThreadGroup()
self.engine_id = engine_service.engine_id
self.topic = topic
self.version = version
self.ctx = context.get_admin_context()
self.rpc_client = rpc_client.EngineClient()
self.rt = {
'registries': [],
}
self.health_check_types = defaultdict(lambda: [])
def _dummy_task(self):
"""A Dummy task that is queued on the health manager thread group.
The task is here so that the service always has something to wait()
on, or else the process will exit.
"""
start_time = timeutils.utcnow(True)
cluster = objects.Cluster.get(self.ctx, cluster_id, project_safe=False)
if not cluster:
LOG.warning("Cluster (%s) is not found.", cluster_id)
return _chase_up(start_time, timeout)
ctx = context.get_service_context(user_id=cluster.user,
project_id=cluster.project)
actions = []
# loop through nodes to poll url for each node
nodes = objects.Node.get_all_by_cluster(ctx, cluster_id)
for node in nodes:
action = self._check_url_and_recover_node(ctx, node,
recover_action, params)
if action:
actions.append(action)
for a in actions:
# wait for action to complete
res, reason = self._wait_for_action(ctx, a['action'], timeout)
if not res:
LOG.warning("Node recovery action %s did not complete "
"within specified timeout: %s", a['action'],
reason)
return _chase_up(start_time, timeout)
try:
self._load_runtime_registry()
except Exception as ex:
LOG.error("Failed when running '_load_runtime_registry': %s", ex)
def _add_listener(self, cluster_id, recover_action):
"""Routine to be executed for adding cluster listener.
@ -438,12 +422,129 @@ class HealthManager(service.Service):
return self.TG.add_thread(ListenerProc, exchange, project, cluster_id,
recover_action)
def _recover_node(self, node_id, ctx, recover_action):
"""Recover node
:returns: Recover action
"""
try:
LOG.info("%s is requesting node recovery "
"for %s.", self.__class__.__name__, node_id)
req = objects.NodeRecoverRequest(identity=node_id,
params=recover_action)
return self.rpc_client.call(ctx, 'node_recover', req)
except Exception as ex:
LOG.error('Error when performing node recovery for %s: %s',
node_id, ex)
return None
def _wait_for_action(self, ctx, action_id, timeout):
req = objects.ActionGetRequest(identity=action_id)
with timeutils.StopWatch(timeout) as timeout_watch:
while not timeout_watch.expired():
action = self.rpc_client.call(ctx, 'action_get', req)
if action['status'] in [
consts.ACTION_SUCCEEDED, consts.ACTION_FAILED,
consts.ACTION_CANCELLED]:
break
time.sleep(2)
if action['status'] == consts.ACTION_SUCCEEDED:
return True, ""
if (action['status'] == consts.ACTION_FAILED or
action['status'] == consts.ACTION_CANCELLED):
return False, "Cluster check action failed or cancelled"
return False, ("Timeout while waiting for node recovery action to "
"finish")
def _add_health_check(self, cluster_id, health_check):
self.health_check_types[cluster_id].append(health_check)
def _execute_health_check(self, interval, cluster_id,
recover_action, recovery_cond,
node_update_timeout):
start_time = timeutils.utcnow(True)
try:
if cluster_id not in self.health_check_types:
LOG.error("Cluster (%s) is not found in health_check_types.",
self.cluster_id)
return _chase_up(start_time, interval)
if len(self.health_check_types[cluster_id]) == 0:
LOG.error("No health check types found for Cluster (%s).",
self.cluster_id)
return _chase_up(start_time, interval)
cluster = objects.Cluster.get(self.ctx, cluster_id,
project_safe=False)
if not cluster:
LOG.warning("Cluster (%s) is not found.", self.cluster_id)
return _chase_up(start_time, interval)
ctx = context.get_service_context(user_id=cluster.user,
project_id=cluster.project)
actions = []
# loop through nodes and run all health checks on each node
nodes = objects.Node.get_all_by_cluster(ctx, cluster_id)
for node in nodes:
node_is_healthy = True
if recovery_cond == consts.ANY_FAILED:
# recovery happens if any detection mode fails
# i.e. the inverse logic is that node is considered healthy
# if all detection modes pass
node_is_healthy = all(
hc.run_health_check(ctx, node)
for hc in self.health_check_types[cluster_id])
elif recovery_cond == consts.ALL_FAILED:
# recovery happens if all detection modes fail
# i.e. the inverse logic is that node is considered healthy
# if any detection mode passes
node_is_healthy = any(
hc.run_health_check(ctx, node)
for hc in self.health_check_types[cluster_id])
else:
raise Exception(
'{} is an invalid recovery conditional'.format(
recovery_cond))
if not node_is_healthy:
action = self._recover_node(node.id, ctx,
recover_action)
actions.append(action)
for a in actions:
# wait for action to complete
res, reason = self._wait_for_action(
ctx, a['action'], node_update_timeout)
if not res:
LOG.warning("Node recovery action %s did not complete "
"within specified timeout: %s", a['action'],
reason)
if len(actions) > 0:
LOG.info('Health check passed for all nodes in cluster %s.',
cluster_id)
except Exception as ex:
LOG.warning('Error while performing health check: %s', ex)
return _chase_up(start_time, interval)
def _start_check(self, entry):
"""Routine for starting the checking for a cluster.
:param entry: A dict containing the data associated with the cluster.
:returns: An updated registry entry record.
"""
LOG.info('Enabling health check for cluster %s.', entry['cluster_id'])
cid = entry['cluster_id']
ctype = entry['check_type']
# Get the recover action parameter from the entry params
@ -459,22 +560,24 @@ class HealthManager(service.Service):
for operation in rac:
recover_action['operation'] = operation.get('name')
if ctype == consts.NODE_STATUS_POLLING:
polling_types = [consts.NODE_STATUS_POLLING,
consts.NODE_STATUS_POLL_URL]
detection_types = ctype.split(',')
if all(check in polling_types for check in detection_types):
interval = min(entry['interval'], cfg.CONF.check_interval_max)
timer = self.TG.add_dynamic_timer(self._poll_cluster,
None, # initial_delay
None, # check_interval_max
cid, interval, recover_action)
for check in ctype.split(','):
self._add_health_check(cid, HealthCheckType.factory(
check, cid, interval, params))
timer = self.TG.add_dynamic_timer(self._execute_health_check,
None, None, interval, cid,
recover_action,
params['recovery_conditional'],
params['node_update_timeout'])
entry['timer'] = timer
elif ctype == consts.NODE_STATUS_POLL_URL:
interval = min(entry['interval'], cfg.CONF.check_interval_max)
timer = self.TG.add_dynamic_timer(self._poll_url,
None, # initial_delay
None, # check_interval_max
cid, interval,
recover_action, params)
entry['timer'] = timer
elif ctype == consts.LIFECYCLE_EVENTS:
elif (len(detection_types) == 1 and
detection_types[0] == consts.LIFECYCLE_EVENTS):
LOG.info("Start listening events for cluster (%s).", cid)
listener = self._add_listener(cid, recover_action)
if listener:
@ -483,8 +586,8 @@ class HealthManager(service.Service):
LOG.warning("Error creating listener for cluster %s", cid)
return None
else:
LOG.warning("Cluster %(id)s check type %(type)s is invalid.",
{'id': cid, 'type': ctype})
LOG.error("Cluster %(id)s check type %(type)s is invalid.",
{'id': cid, 'type': ctype})
return None
return entry
@ -495,10 +598,17 @@ class HealthManager(service.Service):
:param entry: A dict containing the data associated with the cluster.
:returns: ``None``.
"""
LOG.info('Disabling health check for cluster %s.', entry['cluster_id'])
timer = entry.get('timer', None)
if timer:
# stop timer
timer.stop()
# tell threadgroup to remove timer
self.TG.timer_done(timer)
if entry['cluster_id'] in self.health_check_types:
self.health_check_types.pop(entry['cluster_id'])
return
listener = entry.get('listener', None)
@ -558,13 +668,13 @@ class HealthManager(service.Service):
"""Respond to confirm that the rpc service is still alive."""
return True
def register_cluster(self, ctx, cluster_id, check_type, interval=None,
params=None, enabled=True):
def register_cluster(self, ctx, cluster_id, interval=None,
node_update_timeout=None, params=None,
enabled=True):
"""Register cluster for health checking.
:param ctx: The context of notify request.
:param cluster_id: The ID of the cluster to be checked.
:param check_type: A string indicating the type of checks.
:param interval: An optional integer indicating the length of checking
periods in seconds.
:param dict params: Other parameters for the health check.
@ -572,6 +682,17 @@ class HealthManager(service.Service):
"""
params = params or {}
# extract check_type from params
check_type = ""
if 'detection_modes' in params:
check_type = ','.join([
NodePollUrlHealthCheck._convert_detection_tuple(d).type
for d in params['detection_modes']
])
# add node_update_timeout to params
params['node_update_timeout'] = node_update_timeout
registry = objects.HealthRegistry.create(ctx, cluster_id, check_type,
interval, params,
self.engine_id,
@ -603,6 +724,7 @@ class HealthManager(service.Service):
self._stop_check(entry)
self.rt['registries'].pop(i)
objects.HealthRegistry.delete(ctx, cluster_id)
LOG.debug('unregister done')
def enable_cluster(self, ctx, cluster_id, params=None):
for c in self.rt['registries']:
@ -651,12 +773,12 @@ def notify(engine_id, method, **kwargs):
def register(cluster_id, engine_id=None, **kwargs):
params = kwargs.pop('params', {})
interval = kwargs.pop('interval', cfg.CONF.periodic_interval)
check_type = kwargs.pop('check_type', consts.NODE_STATUS_POLLING)
node_update_timeout = kwargs.pop('node_update_timeout', 300)
enabled = kwargs.pop('enabled', True)
return notify(engine_id, 'register_cluster',
cluster_id=cluster_id,
interval=interval,
check_type=check_type,
node_update_timeout=node_update_timeout,
params=params,
enabled=enabled)

View File

@ -316,7 +316,7 @@ class Node(object):
self.index = -1
return True
def do_check(self, context):
def do_check(self, context, return_check_result=False):
if not self.physical_id:
return False
@ -330,6 +330,9 @@ class Node(object):
self.set_status(context, consts.NS_ERROR, six.text_type(ex))
return False
if return_check_result:
return res
# Physical object is ACTIVE but for some reason the node status in
# senlin was WARNING. We only update the status_reason
if res:

View File

@ -10,7 +10,9 @@
# License for the specific language governing permissions and limitations
# under the License.
from collections import namedtuple
from oslo_config import cfg
from oslo_log import log as logging
from senlin.common import constraints
from senlin.common import consts
@ -21,16 +23,21 @@ from senlin.common import schema
from senlin.engine import health_manager
from senlin.policies import base
LOG = logging.getLogger(__name__)
class HealthPolicy(base.Policy):
"""Policy for health management of a cluster."""
VERSION = '1.0'
VERSION = '1.1'
VERSIONS = {
'1.0': [
{'status': consts.EXPERIMENTAL, 'since': '2017.02'},
{'status': consts.SUPPORTED, 'since': '2018.06'},
]
],
'1.1': [
{'status': consts.SUPPORTED, 'since': '2018.09'}
],
}
PRIORITY = 600
@ -55,20 +62,21 @@ class HealthPolicy(base.Policy):
KEYS = (DETECTION, RECOVERY) = ('detection', 'recovery')
_DETECTION_KEYS = (
DETECTION_TYPE, DETECTION_OPTIONS,
DETECTION_MODES, DETECTION_TYPE, DETECTION_OPTIONS, DETECTION_INTERVAL,
NODE_UPDATE_TIMEOUT, RECOVERY_CONDITIONAL
) = (
'type', 'options'
'detection_modes', 'type', 'options', 'interval',
'node_update_timeout', 'recovery_conditional'
)
_DETECTION_OPTIONS = (
DETECTION_INTERVAL, POLL_URL, POLL_URL_SSL_VERIFY,
POLL_URL, POLL_URL_SSL_VERIFY,
POLL_URL_CONN_ERROR_AS_UNHEALTHY, POLL_URL_HEALTHY_RESPONSE,
POLL_URL_RETRY_LIMIT, POLL_URL_RETRY_INTERVAL, NODE_UPDATE_TIMEOUT,
POLL_URL_RETRY_LIMIT, POLL_URL_RETRY_INTERVAL,
) = (
'interval', 'poll_url', 'poll_url_ssl_verify',
'poll_url', 'poll_url_ssl_verify',
'poll_url_conn_error_as_unhealthy', 'poll_url_healthy_response',
'poll_url_retry_limit', 'poll_url_retry_interval',
'node_update_timeout',
'poll_url_retry_limit', 'poll_url_retry_interval'
)
_RECOVERY_KEYS = (
@ -96,68 +104,100 @@ class HealthPolicy(base.Policy):
DETECTION: schema.Map(
_('Policy aspect for node failure detection.'),
schema={
DETECTION_TYPE: schema.String(
_('Type of node failure detection.'),
DETECTION_INTERVAL: schema.Integer(
_("Number of seconds between pollings. Only "
"required when type is 'NODE_STATUS_POLLING' or "
"'NODE_STATUS_POLL_URL'."),
default=60,
),
NODE_UPDATE_TIMEOUT: schema.Integer(
_("Number of seconds since last node update to "
"wait before checking node health."),
default=300,
),
RECOVERY_CONDITIONAL: schema.String(
_("The conditional that determines when recovery should be"
" performed in case multiple detection modes are "
"specified. 'ALL_FAILED' means that all "
"detection modes have to return failed health checks "
"before a node is recovered. 'ANY_FAILED'"
" means that a failed health check with a single "
"detection mode triggers a node recovery."),
constraints=[
constraints.AllowedValues(consts.DETECTION_TYPES),
constraints.AllowedValues(
consts.RECOVERY_CONDITIONAL),
],
required=True,
),
DETECTION_OPTIONS: schema.Map(
schema={
DETECTION_INTERVAL: schema.Integer(
_("Number of seconds between pollings. Only "
"required when type is 'NODE_STATUS_POLLING' or "
"'NODE_STATUS_POLL_URL'."),
default=60,
),
POLL_URL: schema.String(
_("URL to poll for node status. See documentation "
"for valid expansion parameters. Only required "
"when type is 'NODE_STATUS_POLL_URL'."),
default='',
),
POLL_URL_SSL_VERIFY: schema.Boolean(
_("Whether to verify SSL when calling URL to poll "
"for node status. Only required when type is "
"'NODE_STATUS_POLL_URL'."),
default=True,
),
POLL_URL_CONN_ERROR_AS_UNHEALTHY: schema.Boolean(
_("Whether to treat URL connection errors as an "
"indication of an unhealthy node. Only required "
"when type is 'NODE_STATUS_POLL_URL'."),
default=True,
),
POLL_URL_HEALTHY_RESPONSE: schema.String(
_("String pattern in the poll URL response body "
"that indicates a healthy node. "
"Required when type is 'NODE_STATUS_POLL_URL'."),
default='',
),
POLL_URL_RETRY_LIMIT: schema.Integer(
_("Number of times to retry URL polling when its "
"return body is missing "
"POLL_URL_HEALTHY_RESPONSE string before a node "
"is considered down. Required when type is "
"'NODE_STATUS_POLL_URL'."),
default=3,
),
POLL_URL_RETRY_INTERVAL: schema.Integer(
_("Number of seconds between URL polling retries "
"before a node is considered down. "
"Required when type is 'NODE_STATUS_POLL_URL'."),
default=3,
),
NODE_UPDATE_TIMEOUT: schema.Integer(
_("Number of seconds since last node update to "
"wait before checking node health. "
"Required when type is 'NODE_STATUS_POLL_URL'."),
default=300,
),
},
default={}
default=consts.ANY_FAILED,
required=False,
),
DETECTION_MODES: schema.List(
_('List of node failure detection modes.'),
schema=schema.Map(
_('Node failure detection mode to try'),
schema={
DETECTION_TYPE: schema.String(
_('Type of node failure detection.'),
constraints=[
constraints.AllowedValues(
consts.DETECTION_TYPES),
],
required=True,
),
DETECTION_OPTIONS: schema.Map(
schema={
POLL_URL: schema.String(
_("URL to poll for node status. See "
"documentation for valid expansion "
"parameters. Only required "
"when type is "
"'NODE_STATUS_POLL_URL'."),
default='',
),
POLL_URL_SSL_VERIFY: schema.Boolean(
_("Whether to verify SSL when calling "
"URL to poll for node status. Only "
"required when type is "
"'NODE_STATUS_POLL_URL'."),
default=True,
),
POLL_URL_CONN_ERROR_AS_UNHEALTHY:
schema.Boolean(
_("Whether to treat URL connection "
"errors as an indication of an "
"unhealthy node. Only required "
"when type is "
"'NODE_STATUS_POLL_URL'."),
default=True,
),
POLL_URL_HEALTHY_RESPONSE: schema.String(
_("String pattern in the poll URL "
"response body that indicates a "
"healthy node. Required when type "
"is 'NODE_STATUS_POLL_URL'."),
default='',
),
POLL_URL_RETRY_LIMIT: schema.Integer(
_("Number of times to retry URL "
"polling when its return body is "
"missing POLL_URL_HEALTHY_RESPONSE "
"string before a node is considered "
"down. Required when type is "
"'NODE_STATUS_POLL_URL'."),
default=3,
),
POLL_URL_RETRY_INTERVAL: schema.Integer(
_("Number of seconds between URL "
"polling retries before a node is "
"considered down. Required when "
"type is 'NODE_STATUS_POLL_URL'."),
default=3,
),
},
default={}
),
}
)
)
},
required=True,
),
@ -209,27 +249,44 @@ class HealthPolicy(base.Policy):
"action is RECREATE."),
default=False,
),
}
},
required=True,
),
}
def __init__(self, name, spec, **kwargs):
super(HealthPolicy, self).__init__(name, spec, **kwargs)
self.check_type = self.properties[self.DETECTION][self.DETECTION_TYPE]
self.interval = self.properties[self.DETECTION].get(
self.DETECTION_INTERVAL, 60)
options = self.properties[self.DETECTION][self.DETECTION_OPTIONS]
self.interval = options.get(self.DETECTION_INTERVAL, 60)
self.poll_url = options.get(self.POLL_URL, '')
self.poll_url_ssl_verify = options.get(self.POLL_URL_SSL_VERIFY, True)
self.poll_url_conn_error_as_unhealthy = options.get(
self.POLL_URL_CONN_ERROR_AS_UNHEALTHY, True)
self.poll_url_healthy_response = options.get(
self.POLL_URL_HEALTHY_RESPONSE, '')
self.poll_url_retry_limit = options.get(self.POLL_URL_RETRY_LIMIT, '')
self.poll_url_retry_interval = options.get(
self.POLL_URL_RETRY_INTERVAL, '')
self.node_update_timeout = options.get(self.NODE_UPDATE_TIMEOUT, 300)
self.node_update_timeout = self.properties[self.DETECTION].get(
self.NODE_UPDATE_TIMEOUT, 300)
self.recovery_conditional = self.properties[self.DETECTION].get(
self.RECOVERY_CONDITIONAL, consts.ANY_FAILED)
DetectionMode = namedtuple(
'DetectionMode',
[self.DETECTION_TYPE] + list(self._DETECTION_OPTIONS))
self.detection_modes = []
raw_modes = self.properties[self.DETECTION][self.DETECTION_MODES]
for mode in raw_modes:
options = mode[self.DETECTION_OPTIONS]
self.detection_modes.append(
DetectionMode(
mode[self.DETECTION_TYPE],
options.get(self.POLL_URL, ''),
options.get(self.POLL_URL_SSL_VERIFY, True),
options.get(self.POLL_URL_CONN_ERROR_AS_UNHEALTHY, True),
options.get(self.POLL_URL_HEALTHY_RESPONSE, ''),
options.get(self.POLL_URL_RETRY_LIMIT, ''),
options.get(self.POLL_URL_RETRY_INTERVAL, '')
)
)
recover_settings = self.properties[self.RECOVERY]
self.recover_actions = recover_settings[self.RECOVERY_ACTIONS]
@ -257,6 +314,30 @@ class HealthPolicy(base.Policy):
cfg.CONF.health_check_interval_min}
raise exc.InvalidSpec(message=message)
# check valid detection types
polling_types = [consts.NODE_STATUS_POLLING,
consts.NODE_STATUS_POLL_URL]
has_valid_polling_types = all(
d.type in polling_types
for d in self.detection_modes
)
has_valid_lifecycle_type = (
len(self.detection_modes) == 1 and
self.detection_modes[0].type == consts.LIFECYCLE_EVENTS
)
if not has_valid_polling_types and not has_valid_lifecycle_type:
message = ("Invalid detection modes in health policy: %s" %
', '.join([d.type for d in self.detection_modes]))
raise exc.InvalidSpec(message=message)
if len(self.detection_modes) != len(set(self.detection_modes)):
message = ("Duplicate detection modes are not allowed in "
"health policy: %s" %
', '.join([d.type for d in self.detection_modes]))
raise exc.InvalidSpec(message=message)
# TODO(Qiming): Add detection of duplicated action names when
# support to list of actions is implemented.
@ -283,40 +364,33 @@ class HealthPolicy(base.Policy):
return False, err_msg
kwargs = {
'check_type': self.check_type,
'interval': self.interval,
'node_update_timeout': self.node_update_timeout,
'params': {
'recover_action': self.recover_actions,
'poll_url': self.poll_url,
'poll_url_ssl_verify': self.poll_url_ssl_verify,
'poll_url_conn_error_as_unhealthy':
self.poll_url_conn_error_as_unhealthy,
'poll_url_healthy_response': self.poll_url_healthy_response,
'poll_url_retry_limit': self.poll_url_retry_limit,
'poll_url_retry_interval': self.poll_url_retry_interval,
'node_update_timeout': self.node_update_timeout,
'node_delete_timeout': self.node_delete_timeout,
'node_force_recreate': self.node_force_recreate,
'recovery_conditional': self.recovery_conditional,
},
'enabled': enabled
}
converted_detection_modes = [
d._asdict() for d in self.detection_modes
]
detection_mode = {'detection_modes': converted_detection_modes}
kwargs['params'].update(detection_mode)
health_manager.register(cluster.id, engine_id=None, **kwargs)
data = {
'check_type': self.check_type,
'interval': self.interval,
'poll_url': self.poll_url,
'poll_url_ssl_verify': self.poll_url_ssl_verify,
'poll_url_conn_error_as_unhealthy':
self.poll_url_conn_error_as_unhealthy,
'poll_url_healthy_response': self.poll_url_healthy_response,
'poll_url_retry_limit': self.poll_url_retry_limit,
'poll_url_retry_interval': self.poll_url_retry_interval,
'node_update_timeout': self.node_update_timeout,
'recovery_conditional': self.recovery_conditional,
'node_delete_timeout': self.node_delete_timeout,
'node_force_recreate': self.node_force_recreate,
}
data.update(detection_mode)
return True, self._build_policy_data(data)
@ -327,7 +401,10 @@ class HealthPolicy(base.Policy):
:param cluster: The target cluster.
:returns: A tuple comprising the execution result and reason.
"""
health_manager.unregister(cluster.id)
ret = health_manager.unregister(cluster.id)
if not ret:
LOG.warning('Unregistering health manager for cluster %s '
'timed out.', cluster.id)
return True, ''
def pre_op(self, cluster_id, action, **args):

View File

@ -11,8 +11,10 @@
# under the License.
import copy
import eventlet
import inspect
from oslo_config import cfg
from oslo_context import context as oslo_context
from oslo_log import log as logging
from oslo_utils import timeutils
@ -302,7 +304,7 @@ class Profile(object):
try:
return profile.do_check(obj)
except exc.InternalError as ex:
LOG.error(ex)
LOG.debug(ex)
return False
@classmethod
@ -518,6 +520,13 @@ class Profile(object):
raise exc.EResourceOperation(op='recovering', type='node',
id=obj.id,
message=six.text_type(ex))
# pause to allow deleted resource to get reclaimed by nova
# this is needed to avoid a problem when the compute resources are
# at their quota limit. The deleted resource has to become available
# so that the new node can be created.
eventlet.sleep(cfg.CONF.batch_interval)
res = None
try:
res = self.do_create(obj)

File diff suppressed because it is too large Load Diff

View File

@ -10,6 +10,7 @@
# License for the specific language governing permissions and limitations
# under the License.
from collections import namedtuple
import copy
import mock
@ -35,13 +36,15 @@ class TestHealthPolicy(base.SenlinTestCase):
self.spec = {
'type': 'senlin.policy.health',
'version': '1.0',
'version': '1.1',
'properties': {
'detection': {
'type': 'NODE_STATUS_POLLING',
'options': {
'interval': 60
}
"detection_modes": [
{
'type': 'NODE_STATUS_POLLING'
},
],
'interval': 60
},
'recovery': {
'fencing': ['COMPUTE'],
@ -62,13 +65,94 @@ class TestHealthPolicy(base.SenlinTestCase):
self.hp = health_policy.HealthPolicy('test-policy', self.spec)
def test_policy_init(self):
self.assertIsNone(self.hp.id)
self.assertEqual('test-policy', self.hp.name)
self.assertEqual('senlin.policy.health-1.0', self.hp.type)
self.assertEqual('NODE_STATUS_POLLING', self.hp.check_type)
self.assertEqual(60, self.hp.interval)
DetectionMode = namedtuple(
'DetectionMode',
[self.hp.DETECTION_TYPE] + list(self.hp._DETECTION_OPTIONS))
detection_modes = [
DetectionMode(
type='NODE_STATUS_POLLING',
poll_url='',
poll_url_ssl_verify=True,
poll_url_conn_error_as_unhealthy=True,
poll_url_healthy_response='',
poll_url_retry_limit='',
poll_url_retry_interval=''
)
]
spec = {
'type': 'senlin.policy.health',
'version': '1.1',
'properties': {
'detection': {
"detection_modes": [
{
'type': 'NODE_STATUS_POLLING'
},
],
'interval': 60
},
'recovery': {
'fencing': ['COMPUTE'],
'actions': [
{'name': 'REBUILD'}
]
}
}
}
hp = health_policy.HealthPolicy('test-policy', spec)
self.assertIsNone(hp.id)
self.assertEqual('test-policy', hp.name)
self.assertEqual('senlin.policy.health-1.1', hp.type)
self.assertEqual(detection_modes, hp.detection_modes)
self.assertEqual(60, hp.interval)
self.assertEqual([{'name': 'REBUILD', 'params': None}],
self.hp.recover_actions)
hp.recover_actions)
def test_policy_init_ops(self):
spec = {
'type': 'senlin.policy.health',
'version': '1.1',
'properties': {
'detection': {
"detection_modes": [
{
'type': 'NODE_STATUS_POLLING'
},
{
'type': 'NODE_STATUS_POLL_URL'
},
],
'interval': 60
},
'recovery': {
'fencing': ['COMPUTE'],
'actions': [
{'name': 'REBUILD'}
]
}
}
}
operations = [None, 'ALL_FAILED', 'ANY_FAILED']
for op in operations:
# set operation in spec
if op:
spec['properties']['detection']['recovery_conditional'] = op
# test __init__
hp = health_policy.HealthPolicy('test-policy', spec)
# check result
self.assertIsNone(hp.id)
self.assertEqual('test-policy', hp.name)
self.assertEqual('senlin.policy.health-1.1', hp.type)
self.assertEqual(60, hp.interval)
self.assertEqual([{'name': 'REBUILD', 'params': None}],
hp.recover_actions)
def test_validate(self):
spec = copy.deepcopy(self.spec)
@ -86,7 +170,7 @@ class TestHealthPolicy(base.SenlinTestCase):
def test_validate_valid_interval(self):
spec = copy.deepcopy(self.spec)
spec["properties"]["detection"]["options"]["interval"] = 20
spec["properties"]["detection"]["interval"] = 20
self.hp = health_policy.HealthPolicy('test-policy', spec)
cfg.CONF.set_override('health_check_interval_min', 20)
@ -95,7 +179,7 @@ class TestHealthPolicy(base.SenlinTestCase):
def test_validate_invalid_interval(self):
spec = copy.deepcopy(self.spec)
spec["properties"]["detection"]["options"]["interval"] = 10
spec["properties"]["detection"]["interval"] = 10
self.hp = health_policy.HealthPolicy('test-policy', spec)
cfg.CONF.set_override('health_check_interval_min', 20)
@ -116,19 +200,24 @@ class TestHealthPolicy(base.SenlinTestCase):
policy_data = {
'HealthPolicy': {
'data': {
'check_type': self.hp.check_type,
'interval': self.hp.interval,
'poll_url': '',
'poll_url_ssl_verify': True,
'poll_url_conn_error_as_unhealthy': True,
'poll_url_healthy_response': '',
'poll_url_retry_limit': 3,
'poll_url_retry_interval': 3,
'detection_modes': [
{
'type': 'NODE_STATUS_POLLING',
'poll_url': '',
'poll_url_ssl_verify': True,
'poll_url_conn_error_as_unhealthy': True,
'poll_url_healthy_response': '',
'poll_url_retry_limit': '',
'poll_url_retry_interval': ''
}
],
'node_update_timeout': 300,
'node_delete_timeout': 20,
'node_force_recreate': False
'node_force_recreate': False,
'recovery_conditional': 'ANY_FAILED'
},
'version': '1.0'
'version': '1.1'
}
}
@ -136,19 +225,24 @@ class TestHealthPolicy(base.SenlinTestCase):
self.assertTrue(res)
self.assertEqual(policy_data, data)
kwargs = {
'check_type': self.hp.check_type,
'interval': self.hp.interval,
'node_update_timeout': 300,
'params': {
'recover_action': self.hp.recover_actions,
'poll_url': '',
'poll_url_ssl_verify': True,
'poll_url_conn_error_as_unhealthy': True,
'poll_url_healthy_response': '',
'poll_url_retry_limit': 3,
'poll_url_retry_interval': 3,
'node_update_timeout': 300,
'node_delete_timeout': 20,
'node_force_recreate': False
'node_force_recreate': False,
'recovery_conditional': 'ANY_FAILED',
'detection_modes': [
{
'type': 'NODE_STATUS_POLLING',
'poll_url': '',
'poll_url_ssl_verify': True,
'poll_url_conn_error_as_unhealthy': True,
'poll_url_healthy_response': '',
'poll_url_retry_limit': '',
'poll_url_retry_interval': ''
}
],
},
'enabled': True
}

View File

@ -57,6 +57,7 @@ senlin.policies =
senlin.policy.deletion-1.1 = senlin.policies.deletion_policy:DeletionPolicy
senlin.policy.scaling-1.0 = senlin.policies.scaling_policy:ScalingPolicy
senlin.policy.health-1.0 = senlin.policies.health_policy:HealthPolicy
senlin.policy.health-1.1 = senlin.policies.health_policy:HealthPolicy
senlin.policy.loadbalance-1.0 = senlin.policies.lb_policy:LoadBalancingPolicy
senlin.policy.loadbalance-1.1 = senlin.policies.lb_policy:LoadBalancingPolicy
senlin.policy.region_placement-1.0 = senlin.policies.region_placement:RegionPlacementPolicy