Add monitoring server to mistral-extra

This feature will allow user to monitor mistral services and important metrics like operations related to tasks, executions, workflows, etc.
1. Added Monitoring server which can run similar to other mistral components by enabling monitoring and recovery jobs config options.
2. Monitoring plugin can collect metrics from the mistral and publish it in the Prometheus compatible format.
3. By using this monitoring plugin user can attach monitoring tools to the Mistral like Prometheus and Grafana to view metrics in a dashboard.
4. Added recovery jobs to recover from different failing or stucked conditions for eg. removing frozen named locks, refresh state of tasks stucked in waiting state long time, expired sub workflow tasks, stucked workflows, etc.

Implements: blueprint add-mistral-monitoring-plugin

Change-Id: Idbb6de9084504448befb9e346da4f458eb6c5a17
This commit is contained in:
Vasudeo Nimbekar 2023-05-23 16:39:10 +05:30
parent 44b3e5dff5
commit 3385694dd5
17 changed files with 1083 additions and 1 deletions

View File

@ -1,4 +1,5 @@
# Copyright 2020 - Nokia Corporation
# Copyright 2023 - NetCracker Technology Corp.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
@ -48,9 +49,70 @@ openstack_actions_opts = [
),
]
OPENSTACK_ACTIONS_GROUP = 'openstack_actions'
monitoring_opts = [
cfg.BoolOpt(
'enabled',
default=False,
help=('Parameter for monitoring-mode.')
),
cfg.IntOpt(
'metric_collection_interval',
min=1,
default=30,
help=('Metric collection interval')
)
]
recovery_job_opts = [
cfg.BoolOpt(
'enabled',
default=True,
help=('Parameter for enabling recovery job.')
),
cfg.IntOpt(
'recovery_interval',
default=30,
min=1,
help=('Recovery interval')
),
cfg.IntOpt(
'hang_interval',
default=600,
min=1,
help=('Timeout for scheduled calls to be in processing state')
),
cfg.IntOpt(
'idle_task_timeout',
default=120,
min=1,
help=('Timeout for IDLE tasks to send run_task call again')
),
cfg.IntOpt(
'waiting_task_timeout',
default=600,
min=1,
help=('Timeout for WAITING tasks to refresh its state again')
),
cfg.IntOpt(
'expired_subwf_task_timeout',
default=600,
min=1,
help=('Timeout for subwf tasks without created subworkflow')
),
cfg.IntOpt(
'stucked_subwf_task_timeout',
default=600,
min=1,
help=('Timeout for subwf tasks with completed subworkflow')
)
]
OPENSTACK_ACTIONS_GROUP = 'openstack_actions'
MONITORING_GROUP = 'monitoring'
RECOVERY_JOB_GROUP = 'recovery_job'
CONF = cfg.CONF
CONF.register_opts(openstack_actions_opts, group=OPENSTACK_ACTIONS_GROUP)
CONF.register_opt(os_actions_mapping_path)
CONF.register_opts(monitoring_opts, group=MONITORING_GROUP)
CONF.register_opts(recovery_job_opts, group=RECOVERY_JOB_GROUP)

View File

View File

@ -0,0 +1,100 @@
# Copyright 2023 - NetCracker Technology Corp.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import abc
import threading
import eventlet
import six
from oslo_log import log as logging
LOG = logging.getLogger(__name__)
class Metric(object):
def __init__(self, measurement, fields, tags=None):
self.measurement = measurement
self.fields = fields
self.tags = tags
def __repr__(self):
return "measurement: {}, fields: {}, tags: {}".format(
self.measurement, self.fields, self.tags
)
def add_metric(metrics, group, tags={}, fields={}):
metrics.append(Metric(
measurement=group,
fields=fields,
tags=tags
))
@six.add_metaclass(abc.ABCMeta)
class MetricCollector(object):
"""Metric collector unit interface"""
@abc.abstractmethod
def collect(self):
raise NotImplementedError()
@six.add_metaclass(abc.ABCMeta)
class MonitoringJob(object):
def __init__(self, interval=60, first_execute=False):
self._interval = interval
self._job_execution_thread = threading.Thread(
target=self._execute_job
)
self._job_execution_thread.daemon = True
self._stopped = True
self._was_executed = first_execute
def get_name(self):
raise NotImplementedError()
def execute(self):
raise NotImplementedError()
def _execute_job(self):
while not self._stopped:
LOG.debug(
"Starting monitoring job. "
"[job_name=%s]", self.get_name()
)
if self._was_executed:
eventlet.sleep(self._interval)
try:
self._was_executed = True
self.execute()
except Exception:
LOG.exception(
"Monitoring job failed to unexpected exception "
"[job_name=%s]", self.get_name()
)
def start(self):
self._stopped = False
self._job_execution_thread.start()
def stop(self, graceful=False):
self._stopped = True
if graceful:
self._job_execution_thread.join()

View File

@ -0,0 +1,175 @@
# Copyright 2023 - NetCracker Technology Corp.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from mistral.db.v2 import api as db_api
from mistral.workflow import states
from mistral_extra.monitoring import base
from oslo_config import cfg
from oslo_log import log as logging
CONF = cfg.CONF
LOG = logging.getLogger(__name__)
TASK_STATES = [
states.IDLE,
states.WAITING,
states.RUNNING,
states.RUNNING_DELAYED,
states.PAUSED,
states.SUCCESS,
states.CANCELLED,
states.ERROR,
]
WORKFLOW_STATES = [
states.RUNNING,
states.PAUSED,
states.SUCCESS,
states.CANCELLED,
states.ERROR,
]
ACTION_STATES = [
states.RUNNING,
states.PAUSED,
states.SUCCESS,
states.CANCELLED,
states.ERROR
]
DC_TARGET_METHODS = [
'mistral.engine.task_handler._refresh_task_state',
'mistral.engine.task_handler._scheduled_on_action_complete',
'mistral.engine.task_handler._scheduled_on_action_update',
'mistral.engine.workflow_handler._check_and_complete',
'mistral.engine.policies._continue_task',
'mistral.engine.policies._complete_task',
'mistral.engine.policies._fail_task_if_incomplete',
'mistral.services.maintenance._pause_executions',
'mistral.services.maintenance._resume_executions',
]
class MistralMetricCollector(base.MetricCollector):
_metrics_data = []
def collect(self):
with db_api.transaction():
self._update_action_count()
self._update_task_count()
self._update_workflow_count()
self._update_delayed_call_count()
return self._metrics_data
def _update_action_count(self):
counts = dict(db_api.get_action_execution_count_by_state())
action_count_tags = {
"name": "mistral_action_count",
"description": "Count of action by state",
"labels": ['namespace', 'state']
}
for state in ACTION_STATES:
if state not in counts:
base.add_metric(
self._metrics_data,
'mistral_entities',
fields={"state": str(state).lower(), "value": 0},
tags=action_count_tags
)
else:
base.add_metric(
self._metrics_data,
'mistral_entities',
fields={"state": str(state).lower(),
"value": counts[state]},
tags=action_count_tags
)
def _update_task_count(self):
counts = dict(db_api.get_task_execution_count_by_state())
task_count_tags = {
"name": "mistral_task_count",
"description": "Count of tasks by state",
"labels": ['namespace', 'state']
}
for state in TASK_STATES:
if state not in counts:
base.add_metric(
self._metrics_data,
'mistral_entities',
fields={"state": str(state), "value": 0},
tags=task_count_tags
)
else:
base.add_metric(
self._metrics_data,
'mistral_entities',
fields={"state": str(state),
"value": counts[state]},
tags=task_count_tags
)
def _update_workflow_count(self):
counts = dict(db_api.get_workflow_execution_count_by_state())
workflow_count_tags = {
"name": "mistral_workflow_count",
"description": "Count of workflows by state",
"labels": ['namespace', 'state']
}
for state in WORKFLOW_STATES:
if state not in counts:
base.add_metric(
self._metrics_data,
'mistral_entities',
fields={"state": str(state), "value": 0},
tags=workflow_count_tags
)
else:
base.add_metric(
self._metrics_data,
'mistral_entities',
fields={"state": str(state),
"value": counts[state]},
tags=workflow_count_tags
)
def _update_delayed_call_count(self):
counts = dict(db_api.get_delayed_calls_count_by_target())
delayed_calls_tags = {
"name": "mistral_delayed_calls_count",
"description": "Count of delayed calls by target method",
"labels": ['namespace', 'target']
}
for target in DC_TARGET_METHODS:
if target not in counts:
base.add_metric(
self._metrics_data,
"mistral_entities",
fields={"target": str(target).lower(), "value": 0},
tags=delayed_calls_tags
)
else:
base.add_metric(
self._metrics_data,
"mistral_entities",
fields={"target": str(target).lower(),
"value": counts[target]},
tags=delayed_calls_tags
)

View File

@ -0,0 +1,67 @@
# Copyright 2023 - NetCracker Technology Corp.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from mistral.db.v2 import api as db_api
from mistral_extra.monitoring import base
import collections
import datetime
from oslo_config import cfg
from oslo_log import log as logging
CONF = cfg.CONF
LOG = logging.getLogger(__name__)
class DelayedCallsRecoveryJob(base.MonitoringJob):
def __init__(self):
super(DelayedCallsRecoveryJob, self).__init__(
interval=CONF.recovery_job.recovery_interval, first_execute=True)
def get_name(self):
return "delayed calls recovery"
def execute(self):
with db_api.transaction():
self._process_delayed_calls(
template_success_message="Recovered calls",
fail_message="There are no calls for recovery"
)
def _process_delayed_calls(self, template_success_message, fail_message):
calls = db_api.get_calls_for_recovery(
datetime.timedelta(seconds=CONF.recovery_job.hang_interval)
)
if len(calls):
log = str(datetime.datetime.now())
recovered = collections.Counter()
for call in calls:
call.processing = False
recovered[call.target_method_name] += 1
log += "\n{message}. ID: {0}, key: {1}, " \
"factory method: {2}, target method: {3}, " \
"method_arguments: {4}, execution time: {5}, " \
"updated_at: {6}". \
format(call.id, call.key,
call.factory_method_path, call.target_method_name,
call.method_arguments, call.execution_time,
call.updated_at,
message=template_success_message)
LOG.info(log)
else:
LOG.debug(fail_message)

View File

@ -0,0 +1,114 @@
# Copyright 2023 - NetCracker Technology Corp.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from mistral import context as auth_ctx
from mistral.db.v2 import api as db_api
from mistral.scheduler import base as sched_base
from mistral_extra.monitoring import base
import datetime
from oslo_config import cfg
from oslo_log import log as logging
CONF = cfg.CONF
LOG = logging.getLogger(__name__)
class IdleTasksRecoveryJob(base.MonitoringJob):
def __init__(self):
super(IdleTasksRecoveryJob, self).__init__(
interval=CONF.recovery_job.recovery_interval, first_execute=True)
_task_run_path = (
'mistral.engine.task_handler._start_task'
)
def get_name(self):
return "idle tasks recovery"
def execute(self):
auth_ctx.set_ctx(
auth_ctx.MistralContext(
user=None,
tenant=None,
auth_token=None,
is_admin=True
)
)
with db_api.transaction():
self._process_idle_tasks()
def get_idle_tasks(self):
return db_api.get_expired_idle_task_executions(
timeout=datetime.timedelta(
seconds=CONF.recovery_job.idle_task_timeout
)
)
def _get_task_run_key(self, task_ex):
return 't_ex_r-%s' % task_ex.id
def _process_idle_tasks(self):
task_executions = self.get_idle_tasks()
sched = sched_base.get_system_scheduler()
for task_ex in task_executions:
auth_ctx.set_ctx(
auth_ctx.MistralContext(
user=None,
tenant=None,
auth_token=None,
is_admin=True
)
)
wf_ex = db_api.get_workflow_execution(
task_ex.workflow_execution_id)
if wf_ex.root_execution_id:
trace_uuid = wf_ex.root_execution_id
else:
trace_uuid = wf_ex.id
auth_ctx.set_ctx(
auth_ctx.MistralContext(
tenant=task_ex.project_id,
trace_uuid=trace_uuid
)
)
job_exist = sched.has_scheduled_jobs(
key=self._get_task_run_key(task_ex),
processing=False
)
if job_exist:
return
job = sched_base.SchedulerJob(
run_after=1.5,
func_name=self._task_run_path,
func_args={
'task_ex_id': task_ex.id,
'first_run':
task_ex['runtime_context']['recovery']['first_run'],
'waiting': False,
'triggered_by': task_ex.get('triggered_by'),
'rerun': task_ex['runtime_context']['recovery']['rerun'],
'reset': task_ex['runtime_context']['recovery']['reset']
},
key=self._get_task_run_key(task_ex)
)
sched.schedule(job)

View File

@ -0,0 +1,52 @@
# Copyright 2023 - NetCracker Technology Corp.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from mistral import context as auth_ctx
from mistral.db.v2 import api as db_api
from mistral_extra.monitoring import base
from oslo_config import cfg
from oslo_log import log as logging
CONF = cfg.CONF
LOG = logging.getLogger(__name__)
class NamedLocksRecoveryJob(base.MonitoringJob):
def __init__(self):
super(NamedLocksRecoveryJob, self).__init__(
interval=CONF.recovery_job.recovery_interval, first_execute=True)
def get_name(self):
return "named locks recovery"
def execute(self):
self._delete_frozen_named_locks()
def _delete_frozen_named_locks(self):
with db_api.transaction():
auth_ctx.set_ctx(
auth_ctx.MistralContext(
user=None,
tenant=None,
auth_token=None,
is_admin=True
)
)
deleted_count = db_api.delete_named_locks()
if deleted_count:
log = f'No of Named locks was removed: {deleted_count}'
LOG.debug(log)
else:
LOG.debug('There are not any frozen named locks present')

View File

@ -0,0 +1,86 @@
# Copyright 2023 - NetCracker Technology Corp.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from mistral import context as auth_ctx
from mistral.db.v2 import api as db_api
from mistral.engine import workflow_handler as w_h
from mistral_extra.monitoring import base
import datetime
from oslo_config import cfg
from oslo_log import log as logging
CONF = cfg.CONF
LOG = logging.getLogger(__name__)
class SubworkflowCompleteRecoveryJob(base.MonitoringJob):
def __init__(self):
super(SubworkflowCompleteRecoveryJob, self).__init__(
interval=CONF.recovery_job.stucked_subwf_task_timeout,
first_execute=True
)
def get_name(self):
return "complete subworkflow recovery"
def execute(self):
timeout = datetime.timedelta(
seconds=CONF.recovery_job.stucked_subwf_task_timeout
)
with db_api.transaction():
auth_ctx.set_ctx(
auth_ctx.MistralContext(
user=None,
tenant=None,
auth_token=None,
is_admin=True
)
)
task_executions = db_api.get_stucked_subwf_tasks(timeout)
LOG.info(f"Found {len(task_executions)} stucked subwf tasks.")
for task_id in task_executions:
auth_ctx.set_ctx(
auth_ctx.MistralContext(
user=None,
tenant=None,
auth_token=None,
is_admin=True
)
)
task_ex = db_api.get_task_execution(task_id)
wf_ex = db_api.get_workflow_execution(
task_ex.workflow_execution_id
)
if wf_ex.root_execution_id:
trace_uuid = wf_ex.root_execution_id
else:
trace_uuid = wf_ex.id
auth_ctx.set_ctx(
auth_ctx.MistralContext(
tenant=task_ex.project_id,
trace_uuid=trace_uuid
)
)
w_h._schedule_check_and_fix_integrity(
wf_ex,
CONF.engine.execution_integrity_check_delay
)

View File

@ -0,0 +1,95 @@
# Copyright 2023 - NetCracker Technology Corp.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from mistral import context as auth_ctx
from mistral.db.v2 import api as db_api
from mistral.engine import task_handler as t_h
from mistral.scheduler import base as sched_base
from mistral_extra.monitoring import base
import datetime
from oslo_config import cfg
from oslo_log import log as logging
CONF = cfg.CONF
LOG = logging.getLogger(__name__)
class SubworkflowStartRecoveryJob(base.MonitoringJob):
def __init__(self):
super(SubworkflowStartRecoveryJob, self).__init__(
interval=CONF.recovery_job.expired_subwf_task_timeout,
first_execute=True
)
def get_name(self):
return "start subworkflow recovery"
def execute(self):
timeout = datetime.timedelta(
seconds=CONF.recovery_job.expired_subwf_task_timeout
)
sched = sched_base.get_system_scheduler()
with db_api.transaction():
auth_ctx.set_ctx(
auth_ctx.MistralContext(
user=None,
tenant=None,
auth_token=None,
is_admin=True
)
)
task_executions = db_api.get_expired_subwf_tasks(timeout)
LOG.info(f"Found {len(task_executions)} expired subwf tasks.")
for task_id in task_executions:
auth_ctx.set_ctx(
auth_ctx.MistralContext(
user=None,
tenant=None,
auth_token=None,
is_admin=True
)
)
task_ex = db_api.get_task_execution(task_id)
wf_ex = db_api.get_workflow_execution(
task_ex.workflow_execution_id
)
if wf_ex.root_execution_id:
trace_uuid = wf_ex.root_execution_id
else:
trace_uuid = wf_ex.id
auth_ctx.set_ctx(
auth_ctx.MistralContext(
tenant=task_ex.project_id,
trace_uuid=trace_uuid
)
)
job = sched_base.SchedulerJob(
run_after=1.5,
func_name=t_h._REFRESH_TASK_STATE_PATH,
func_args={
'task_ex_id': task_ex.id,
'recovery': 'ERROR'
},
key=t_h._get_refresh_state_job_key(task_ex.id)
)
sched.schedule(job)

View File

@ -0,0 +1,93 @@
# Copyright 2023 - NetCracker Technology Corp.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from mistral import context as auth_ctx
from mistral.db.v2 import api as db_api
from mistral.engine import task_handler as t_h
from mistral.scheduler import base as sched_base
from mistral_extra.monitoring import base
import datetime
from oslo_config import cfg
from oslo_log import log as logging
CONF = cfg.CONF
LOG = logging.getLogger(__name__)
class WaitingTasksRecoveryJob(base.MonitoringJob):
def __init__(self):
super(WaitingTasksRecoveryJob, self).__init__(
interval=CONF.recovery_job.waiting_task_timeout,
first_execute=True
)
_refresh_task_state_path = (
'mistral.engine.task_handler._refresh_task_state'
)
def get_name(self):
return "waiting tasks recovery"
def execute(self):
timeout = datetime.timedelta(
seconds=CONF.recovery_job.waiting_task_timeout
)
sched = sched_base.get_system_scheduler()
with db_api.transaction():
auth_ctx.set_ctx(
auth_ctx.MistralContext(
user=None,
tenant=None,
auth_token=None,
is_admin=True
)
)
task_executions = db_api.get_expired_waiting_tasks(timeout)
LOG.info(f"Found {len(task_executions)} stucked waiting tasks.")
for task_ex in task_executions:
auth_ctx.set_ctx(
auth_ctx.MistralContext(
user=None,
tenant=None,
auth_token=None,
is_admin=True
)
)
wf_ex = db_api.get_workflow_execution(
task_ex.workflow_execution_id)
if wf_ex.root_execution_id:
trace_uuid = wf_ex.root_execution_id
else:
trace_uuid = wf_ex.id
auth_ctx.set_ctx(
auth_ctx.MistralContext(
tenant=task_ex.project_id,
trace_uuid=trace_uuid
)
)
job = sched_base.SchedulerJob(
run_after=1.5,
func_name=self._refresh_task_state_path,
func_args={'task_ex_id': task_ex.id},
key=t_h._get_refresh_state_job_key(task_ex.id)
)
sched.schedule(job)

View File

@ -0,0 +1,48 @@
# Copyright 2023 - NetCracker Technology Corp.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import os
from alembic import config as alembic_cfg
from mistral.config import CONF
from mistral_extra.monitoring import monitoring_server
from oslo_log import log as logging
import urllib3
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
LOG = logging.getLogger(__name__)
def main():
config = alembic_cfg.Config(
os.path.join(os.path.dirname(__file__), 'alembic.ini')
)
config.set_main_option(
'script_location',
'mistral.db.sqlalchemy.migration:alembic_migrations'
)
# attach the Mistral conf to the Alembic conf
config.mistral_config = CONF
logging.register_options(CONF)
CONF(project='mistral')
logging.setup(CONF, 'Mistral')
monitoring = monitoring_server.MonitoringServer()
monitoring.start()
if __name__ == '__main__':
main()

View File

@ -0,0 +1,113 @@
# Copyright 2023 - NetCracker Technology Corp.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import datetime
from flask import Flask
from flask import jsonify
from flask import Response
from importlib_metadata import entry_points
from mistral.service import base as service_base
from mistral_extra.monitoring.prometheus import format_to_prometheus
from oslo_config import cfg
from oslo_log import log as logging
CONF = cfg.CONF
LOG = logging.getLogger(__name__)
def get_oslo_service(setup_profiler=True):
return MonitoringServer(
setup_profiler=setup_profiler
)
class MonitoringServer(service_base.MistralService):
def __init__(self, setup_profiler=True):
super(MonitoringServer, self).__init__(
'monitoring_group',
setup_profiler
)
collectors = entry_points(group='monitoring.metric_collector')
self._metric_collectors = [collector.load()()
for collector in collectors]
self._jobs = []
self._standard_tags = {}
self._metrics = {}
self._prometheus_formatted_metrics = []
self._last_updated = None
self._timedelta = datetime.timedelta(
seconds=CONF.monitoring.metric_collection_interval
)
self.app = Flask(__name__)
self.app.add_url_rule('/metrics', 'metrics', self.metrics)
self.app.add_url_rule('/health', 'health', self.health)
def collect_metrics(self, to_json=False):
now = datetime.datetime.now()
if not self._last_updated or self._outdated(now):
metrics = []
for collector in self._metric_collectors:
metrics.extend(collector.collect())
for metric in metrics:
metric.tags.update(self._standard_tags)
self._metrics = metrics
self._last_updated = now
if to_json:
return list(map(lambda x: x.__dict__, self._metrics))
return self._metrics
def _outdated(self, now):
return self._last_updated <= now - self._timedelta
def _get_prometheus_metrics(self):
metrics = self.collect_metrics(to_json=True)
pr_metrics = format_to_prometheus(metrics)
return ''.join([line.decode('utf-8') for line in pr_metrics])
def metrics(self):
with self.app.app_context():
m = self._get_prometheus_metrics()
return Response(m, 200, content_type='text/plain')
def health(self):
with self.app.app_context():
return jsonify({'status': 'UP'})
def _init_monitoring_jobs(self):
if CONF.recovery_job.enabled:
recovery_jobs = entry_points(group='monitoring.recovery_jobs')
for job in recovery_jobs:
recovery_job = job.load()()
self._jobs.append(recovery_job)
recovery_job.start()
def start(self):
super(MonitoringServer, self).start()
self._init_monitoring_jobs()
self.app.run(host="0.0.0.0", port=9090)
def stop(self, graceful=False):
super(MonitoringServer, self).stop()
for job in self._jobs:
job.stop(graceful)

View File

@ -0,0 +1,58 @@
# Copyright 2023 - NetCracker Technology Corp.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import prometheus_client
from prometheus_client import Gauge
from oslo_log import log as logging
LOG = logging.getLogger(__name__)
_GRAPH = {}
def format_to_prometheus(metrics_list):
for metric in metrics_list:
metric_tags = metric['tags']
metric_name = metric_tags.get('name')
if not metric_name:
LOG.error('Unable to parse metric')
continue
metric_labels = metric_tags.get('labels')
metric_fields = metric['fields']
metric_value = metric_fields['value']
metric_label_values = {
label: metric_fields[label]
for label in metric_labels if label != 'namespace'
}
if 'namespace' in metric_labels:
metric_label_values['namespace'] = metric_tags.get('namespace')
if not _GRAPH.get(metric_name):
_GRAPH[metric_name] = Gauge(
metric_name,
metric_tags.get('description'),
metric_labels
)
_GRAPH[metric_name].labels(**metric_label_values).set(metric_value)
res = []
for key in _GRAPH:
res.append(prometheus_client.generate_latest(_GRAPH[key]))
return res

View File

@ -36,3 +36,9 @@ python-zunclient>=3.4.0 # Apache-2.0
oauthlib>=0.6.2 # BSD
yaql>=1.1.3 # Apache-2.0
keystoneauth1>=3.18.0 # Apache-2.0
# monitoring dependencies
alembic>=1.7.7 # MIT
oslo.config>=8.7.0 # Apache-2.0
Flask>=1.0.2
prometheus_client>=0.15.0

View File

@ -25,10 +25,23 @@ packages =
[entry_points]
mistral.generators =
generators = mistral_extra.actions.generator_factory:all_generators
console_scripts =
mistral-monitoring = mistral_extra.monitoring.launch:main
mistral.preinstalled_workflows =
workflows = mistral_extra.workflows:get_preinstalled_workflows
monitoring.metric_collector =
mistral = mistral_extra.monitoring.collectors.mistral_collector:MistralMetricCollector
monitoring.recovery_jobs =
delayed_calls = mistral_extra.monitoring.jobs.delayed_calls_recovery:DelayedCallsRecoveryJob
idle_tasks = mistral_extra.monitoring.jobs.idle_tasks_recovery:IdleTasksRecoveryJob
named_locks = mistral_extra.monitoring.jobs.named_locks_recovery:NamedLocksRecoveryJob
waiting_tasks = mistral_extra.monitoring.jobs.waiting_tasks_recovery:WaitingTasksRecoveryJob
sub_workflows_start = mistral_extra.monitoring.jobs.subworkflow_start_recovery:SubworkflowStartRecoveryJob
sub_workflow_complete = mistral_extra.monitoring.jobs.subworkflow_complete_recovery:SubworkflowCompleteRecoveryJob
[build_sphinx]
source-dir = doc/source
build-dir = doc/build