Merge "Add monitoring server to mistral-extra"

This commit is contained in:
Zuul 2023-08-08 09:08:26 +00:00 committed by Gerrit Code Review
commit 2e0139b9b2
17 changed files with 1083 additions and 1 deletions

View File

@ -1,4 +1,5 @@
# Copyright 2020 - Nokia Corporation
# Copyright 2023 - NetCracker Technology Corp.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
@ -48,9 +49,70 @@ openstack_actions_opts = [
),
]
OPENSTACK_ACTIONS_GROUP = 'openstack_actions'
monitoring_opts = [
cfg.BoolOpt(
'enabled',
default=False,
help=('Parameter for monitoring-mode.')
),
cfg.IntOpt(
'metric_collection_interval',
min=1,
default=30,
help=('Metric collection interval')
)
]
recovery_job_opts = [
cfg.BoolOpt(
'enabled',
default=True,
help=('Parameter for enabling recovery job.')
),
cfg.IntOpt(
'recovery_interval',
default=30,
min=1,
help=('Recovery interval')
),
cfg.IntOpt(
'hang_interval',
default=600,
min=1,
help=('Timeout for scheduled calls to be in processing state')
),
cfg.IntOpt(
'idle_task_timeout',
default=120,
min=1,
help=('Timeout for IDLE tasks to send run_task call again')
),
cfg.IntOpt(
'waiting_task_timeout',
default=600,
min=1,
help=('Timeout for WAITING tasks to refresh its state again')
),
cfg.IntOpt(
'expired_subwf_task_timeout',
default=600,
min=1,
help=('Timeout for subwf tasks without created subworkflow')
),
cfg.IntOpt(
'stucked_subwf_task_timeout',
default=600,
min=1,
help=('Timeout for subwf tasks with completed subworkflow')
)
]
OPENSTACK_ACTIONS_GROUP = 'openstack_actions'
MONITORING_GROUP = 'monitoring'
RECOVERY_JOB_GROUP = 'recovery_job'
CONF = cfg.CONF
CONF.register_opts(openstack_actions_opts, group=OPENSTACK_ACTIONS_GROUP)
CONF.register_opt(os_actions_mapping_path)
CONF.register_opts(monitoring_opts, group=MONITORING_GROUP)
CONF.register_opts(recovery_job_opts, group=RECOVERY_JOB_GROUP)

View File

View File

@ -0,0 +1,100 @@
# Copyright 2023 - NetCracker Technology Corp.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import abc
import threading
import eventlet
import six
from oslo_log import log as logging
LOG = logging.getLogger(__name__)
class Metric(object):
def __init__(self, measurement, fields, tags=None):
self.measurement = measurement
self.fields = fields
self.tags = tags
def __repr__(self):
return "measurement: {}, fields: {}, tags: {}".format(
self.measurement, self.fields, self.tags
)
def add_metric(metrics, group, tags={}, fields={}):
metrics.append(Metric(
measurement=group,
fields=fields,
tags=tags
))
@six.add_metaclass(abc.ABCMeta)
class MetricCollector(object):
"""Metric collector unit interface"""
@abc.abstractmethod
def collect(self):
raise NotImplementedError()
@six.add_metaclass(abc.ABCMeta)
class MonitoringJob(object):
def __init__(self, interval=60, first_execute=False):
self._interval = interval
self._job_execution_thread = threading.Thread(
target=self._execute_job
)
self._job_execution_thread.daemon = True
self._stopped = True
self._was_executed = first_execute
def get_name(self):
raise NotImplementedError()
def execute(self):
raise NotImplementedError()
def _execute_job(self):
while not self._stopped:
LOG.debug(
"Starting monitoring job. "
"[job_name=%s]", self.get_name()
)
if self._was_executed:
eventlet.sleep(self._interval)
try:
self._was_executed = True
self.execute()
except Exception:
LOG.exception(
"Monitoring job failed to unexpected exception "
"[job_name=%s]", self.get_name()
)
def start(self):
self._stopped = False
self._job_execution_thread.start()
def stop(self, graceful=False):
self._stopped = True
if graceful:
self._job_execution_thread.join()

View File

@ -0,0 +1,175 @@
# Copyright 2023 - NetCracker Technology Corp.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from mistral.db.v2 import api as db_api
from mistral.workflow import states
from mistral_extra.monitoring import base
from oslo_config import cfg
from oslo_log import log as logging
CONF = cfg.CONF
LOG = logging.getLogger(__name__)
TASK_STATES = [
states.IDLE,
states.WAITING,
states.RUNNING,
states.RUNNING_DELAYED,
states.PAUSED,
states.SUCCESS,
states.CANCELLED,
states.ERROR,
]
WORKFLOW_STATES = [
states.RUNNING,
states.PAUSED,
states.SUCCESS,
states.CANCELLED,
states.ERROR,
]
ACTION_STATES = [
states.RUNNING,
states.PAUSED,
states.SUCCESS,
states.CANCELLED,
states.ERROR
]
DC_TARGET_METHODS = [
'mistral.engine.task_handler._refresh_task_state',
'mistral.engine.task_handler._scheduled_on_action_complete',
'mistral.engine.task_handler._scheduled_on_action_update',
'mistral.engine.workflow_handler._check_and_complete',
'mistral.engine.policies._continue_task',
'mistral.engine.policies._complete_task',
'mistral.engine.policies._fail_task_if_incomplete',
'mistral.services.maintenance._pause_executions',
'mistral.services.maintenance._resume_executions',
]
class MistralMetricCollector(base.MetricCollector):
_metrics_data = []
def collect(self):
with db_api.transaction():
self._update_action_count()
self._update_task_count()
self._update_workflow_count()
self._update_delayed_call_count()
return self._metrics_data
def _update_action_count(self):
counts = dict(db_api.get_action_execution_count_by_state())
action_count_tags = {
"name": "mistral_action_count",
"description": "Count of action by state",
"labels": ['namespace', 'state']
}
for state in ACTION_STATES:
if state not in counts:
base.add_metric(
self._metrics_data,
'mistral_entities',
fields={"state": str(state).lower(), "value": 0},
tags=action_count_tags
)
else:
base.add_metric(
self._metrics_data,
'mistral_entities',
fields={"state": str(state).lower(),
"value": counts[state]},
tags=action_count_tags
)
def _update_task_count(self):
counts = dict(db_api.get_task_execution_count_by_state())
task_count_tags = {
"name": "mistral_task_count",
"description": "Count of tasks by state",
"labels": ['namespace', 'state']
}
for state in TASK_STATES:
if state not in counts:
base.add_metric(
self._metrics_data,
'mistral_entities',
fields={"state": str(state), "value": 0},
tags=task_count_tags
)
else:
base.add_metric(
self._metrics_data,
'mistral_entities',
fields={"state": str(state),
"value": counts[state]},
tags=task_count_tags
)
def _update_workflow_count(self):
counts = dict(db_api.get_workflow_execution_count_by_state())
workflow_count_tags = {
"name": "mistral_workflow_count",
"description": "Count of workflows by state",
"labels": ['namespace', 'state']
}
for state in WORKFLOW_STATES:
if state not in counts:
base.add_metric(
self._metrics_data,
'mistral_entities',
fields={"state": str(state), "value": 0},
tags=workflow_count_tags
)
else:
base.add_metric(
self._metrics_data,
'mistral_entities',
fields={"state": str(state),
"value": counts[state]},
tags=workflow_count_tags
)
def _update_delayed_call_count(self):
counts = dict(db_api.get_delayed_calls_count_by_target())
delayed_calls_tags = {
"name": "mistral_delayed_calls_count",
"description": "Count of delayed calls by target method",
"labels": ['namespace', 'target']
}
for target in DC_TARGET_METHODS:
if target not in counts:
base.add_metric(
self._metrics_data,
"mistral_entities",
fields={"target": str(target).lower(), "value": 0},
tags=delayed_calls_tags
)
else:
base.add_metric(
self._metrics_data,
"mistral_entities",
fields={"target": str(target).lower(),
"value": counts[target]},
tags=delayed_calls_tags
)

View File

@ -0,0 +1,67 @@
# Copyright 2023 - NetCracker Technology Corp.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from mistral.db.v2 import api as db_api
from mistral_extra.monitoring import base
import collections
import datetime
from oslo_config import cfg
from oslo_log import log as logging
CONF = cfg.CONF
LOG = logging.getLogger(__name__)
class DelayedCallsRecoveryJob(base.MonitoringJob):
def __init__(self):
super(DelayedCallsRecoveryJob, self).__init__(
interval=CONF.recovery_job.recovery_interval, first_execute=True)
def get_name(self):
return "delayed calls recovery"
def execute(self):
with db_api.transaction():
self._process_delayed_calls(
template_success_message="Recovered calls",
fail_message="There are no calls for recovery"
)
def _process_delayed_calls(self, template_success_message, fail_message):
calls = db_api.get_calls_for_recovery(
datetime.timedelta(seconds=CONF.recovery_job.hang_interval)
)
if len(calls):
log = str(datetime.datetime.now())
recovered = collections.Counter()
for call in calls:
call.processing = False
recovered[call.target_method_name] += 1
log += "\n{message}. ID: {0}, key: {1}, " \
"factory method: {2}, target method: {3}, " \
"method_arguments: {4}, execution time: {5}, " \
"updated_at: {6}". \
format(call.id, call.key,
call.factory_method_path, call.target_method_name,
call.method_arguments, call.execution_time,
call.updated_at,
message=template_success_message)
LOG.info(log)
else:
LOG.debug(fail_message)

View File

@ -0,0 +1,114 @@
# Copyright 2023 - NetCracker Technology Corp.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from mistral import context as auth_ctx
from mistral.db.v2 import api as db_api
from mistral.scheduler import base as sched_base
from mistral_extra.monitoring import base
import datetime
from oslo_config import cfg
from oslo_log import log as logging
CONF = cfg.CONF
LOG = logging.getLogger(__name__)
class IdleTasksRecoveryJob(base.MonitoringJob):
def __init__(self):
super(IdleTasksRecoveryJob, self).__init__(
interval=CONF.recovery_job.recovery_interval, first_execute=True)
_task_run_path = (
'mistral.engine.task_handler._start_task'
)
def get_name(self):
return "idle tasks recovery"
def execute(self):
auth_ctx.set_ctx(
auth_ctx.MistralContext(
user=None,
tenant=None,
auth_token=None,
is_admin=True
)
)
with db_api.transaction():
self._process_idle_tasks()
def get_idle_tasks(self):
return db_api.get_expired_idle_task_executions(
timeout=datetime.timedelta(
seconds=CONF.recovery_job.idle_task_timeout
)
)
def _get_task_run_key(self, task_ex):
return 't_ex_r-%s' % task_ex.id
def _process_idle_tasks(self):
task_executions = self.get_idle_tasks()
sched = sched_base.get_system_scheduler()
for task_ex in task_executions:
auth_ctx.set_ctx(
auth_ctx.MistralContext(
user=None,
tenant=None,
auth_token=None,
is_admin=True
)
)
wf_ex = db_api.get_workflow_execution(
task_ex.workflow_execution_id)
if wf_ex.root_execution_id:
trace_uuid = wf_ex.root_execution_id
else:
trace_uuid = wf_ex.id
auth_ctx.set_ctx(
auth_ctx.MistralContext(
tenant=task_ex.project_id,
trace_uuid=trace_uuid
)
)
job_exist = sched.has_scheduled_jobs(
key=self._get_task_run_key(task_ex),
processing=False
)
if job_exist:
return
job = sched_base.SchedulerJob(
run_after=1.5,
func_name=self._task_run_path,
func_args={
'task_ex_id': task_ex.id,
'first_run':
task_ex['runtime_context']['recovery']['first_run'],
'waiting': False,
'triggered_by': task_ex.get('triggered_by'),
'rerun': task_ex['runtime_context']['recovery']['rerun'],
'reset': task_ex['runtime_context']['recovery']['reset']
},
key=self._get_task_run_key(task_ex)
)
sched.schedule(job)

View File

@ -0,0 +1,52 @@
# Copyright 2023 - NetCracker Technology Corp.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from mistral import context as auth_ctx
from mistral.db.v2 import api as db_api
from mistral_extra.monitoring import base
from oslo_config import cfg
from oslo_log import log as logging
CONF = cfg.CONF
LOG = logging.getLogger(__name__)
class NamedLocksRecoveryJob(base.MonitoringJob):
def __init__(self):
super(NamedLocksRecoveryJob, self).__init__(
interval=CONF.recovery_job.recovery_interval, first_execute=True)
def get_name(self):
return "named locks recovery"
def execute(self):
self._delete_frozen_named_locks()
def _delete_frozen_named_locks(self):
with db_api.transaction():
auth_ctx.set_ctx(
auth_ctx.MistralContext(
user=None,
tenant=None,
auth_token=None,
is_admin=True
)
)
deleted_count = db_api.delete_named_locks()
if deleted_count:
log = f'No of Named locks was removed: {deleted_count}'
LOG.debug(log)
else:
LOG.debug('There are not any frozen named locks present')

View File

@ -0,0 +1,86 @@
# Copyright 2023 - NetCracker Technology Corp.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from mistral import context as auth_ctx
from mistral.db.v2 import api as db_api
from mistral.engine import workflow_handler as w_h
from mistral_extra.monitoring import base
import datetime
from oslo_config import cfg
from oslo_log import log as logging
CONF = cfg.CONF
LOG = logging.getLogger(__name__)
class SubworkflowCompleteRecoveryJob(base.MonitoringJob):
def __init__(self):
super(SubworkflowCompleteRecoveryJob, self).__init__(
interval=CONF.recovery_job.stucked_subwf_task_timeout,
first_execute=True
)
def get_name(self):
return "complete subworkflow recovery"
def execute(self):
timeout = datetime.timedelta(
seconds=CONF.recovery_job.stucked_subwf_task_timeout
)
with db_api.transaction():
auth_ctx.set_ctx(
auth_ctx.MistralContext(
user=None,
tenant=None,
auth_token=None,
is_admin=True
)
)
task_executions = db_api.get_stucked_subwf_tasks(timeout)
LOG.info(f"Found {len(task_executions)} stucked subwf tasks.")
for task_id in task_executions:
auth_ctx.set_ctx(
auth_ctx.MistralContext(
user=None,
tenant=None,
auth_token=None,
is_admin=True
)
)
task_ex = db_api.get_task_execution(task_id)
wf_ex = db_api.get_workflow_execution(
task_ex.workflow_execution_id
)
if wf_ex.root_execution_id:
trace_uuid = wf_ex.root_execution_id
else:
trace_uuid = wf_ex.id
auth_ctx.set_ctx(
auth_ctx.MistralContext(
tenant=task_ex.project_id,
trace_uuid=trace_uuid
)
)
w_h._schedule_check_and_fix_integrity(
wf_ex,
CONF.engine.execution_integrity_check_delay
)

View File

@ -0,0 +1,95 @@
# Copyright 2023 - NetCracker Technology Corp.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from mistral import context as auth_ctx
from mistral.db.v2 import api as db_api
from mistral.engine import task_handler as t_h
from mistral.scheduler import base as sched_base
from mistral_extra.monitoring import base
import datetime
from oslo_config import cfg
from oslo_log import log as logging
CONF = cfg.CONF
LOG = logging.getLogger(__name__)
class SubworkflowStartRecoveryJob(base.MonitoringJob):
def __init__(self):
super(SubworkflowStartRecoveryJob, self).__init__(
interval=CONF.recovery_job.expired_subwf_task_timeout,
first_execute=True
)
def get_name(self):
return "start subworkflow recovery"
def execute(self):
timeout = datetime.timedelta(
seconds=CONF.recovery_job.expired_subwf_task_timeout
)
sched = sched_base.get_system_scheduler()
with db_api.transaction():
auth_ctx.set_ctx(
auth_ctx.MistralContext(
user=None,
tenant=None,
auth_token=None,
is_admin=True
)
)
task_executions = db_api.get_expired_subwf_tasks(timeout)
LOG.info(f"Found {len(task_executions)} expired subwf tasks.")
for task_id in task_executions:
auth_ctx.set_ctx(
auth_ctx.MistralContext(
user=None,
tenant=None,
auth_token=None,
is_admin=True
)
)
task_ex = db_api.get_task_execution(task_id)
wf_ex = db_api.get_workflow_execution(
task_ex.workflow_execution_id
)
if wf_ex.root_execution_id:
trace_uuid = wf_ex.root_execution_id
else:
trace_uuid = wf_ex.id
auth_ctx.set_ctx(
auth_ctx.MistralContext(
tenant=task_ex.project_id,
trace_uuid=trace_uuid
)
)
job = sched_base.SchedulerJob(
run_after=1.5,
func_name=t_h._REFRESH_TASK_STATE_PATH,
func_args={
'task_ex_id': task_ex.id,
'recovery': 'ERROR'
},
key=t_h._get_refresh_state_job_key(task_ex.id)
)
sched.schedule(job)

View File

@ -0,0 +1,93 @@
# Copyright 2023 - NetCracker Technology Corp.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from mistral import context as auth_ctx
from mistral.db.v2 import api as db_api
from mistral.engine import task_handler as t_h
from mistral.scheduler import base as sched_base
from mistral_extra.monitoring import base
import datetime
from oslo_config import cfg
from oslo_log import log as logging
CONF = cfg.CONF
LOG = logging.getLogger(__name__)
class WaitingTasksRecoveryJob(base.MonitoringJob):
def __init__(self):
super(WaitingTasksRecoveryJob, self).__init__(
interval=CONF.recovery_job.waiting_task_timeout,
first_execute=True
)
_refresh_task_state_path = (
'mistral.engine.task_handler._refresh_task_state'
)
def get_name(self):
return "waiting tasks recovery"
def execute(self):
timeout = datetime.timedelta(
seconds=CONF.recovery_job.waiting_task_timeout
)
sched = sched_base.get_system_scheduler()
with db_api.transaction():
auth_ctx.set_ctx(
auth_ctx.MistralContext(
user=None,
tenant=None,
auth_token=None,
is_admin=True
)
)
task_executions = db_api.get_expired_waiting_tasks(timeout)
LOG.info(f"Found {len(task_executions)} stucked waiting tasks.")
for task_ex in task_executions:
auth_ctx.set_ctx(
auth_ctx.MistralContext(
user=None,
tenant=None,
auth_token=None,
is_admin=True
)
)
wf_ex = db_api.get_workflow_execution(
task_ex.workflow_execution_id)
if wf_ex.root_execution_id:
trace_uuid = wf_ex.root_execution_id
else:
trace_uuid = wf_ex.id
auth_ctx.set_ctx(
auth_ctx.MistralContext(
tenant=task_ex.project_id,
trace_uuid=trace_uuid
)
)
job = sched_base.SchedulerJob(
run_after=1.5,
func_name=self._refresh_task_state_path,
func_args={'task_ex_id': task_ex.id},
key=t_h._get_refresh_state_job_key(task_ex.id)
)
sched.schedule(job)

View File

@ -0,0 +1,48 @@
# Copyright 2023 - NetCracker Technology Corp.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import os
from alembic import config as alembic_cfg
from mistral.config import CONF
from mistral_extra.monitoring import monitoring_server
from oslo_log import log as logging
import urllib3
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
LOG = logging.getLogger(__name__)
def main():
config = alembic_cfg.Config(
os.path.join(os.path.dirname(__file__), 'alembic.ini')
)
config.set_main_option(
'script_location',
'mistral.db.sqlalchemy.migration:alembic_migrations'
)
# attach the Mistral conf to the Alembic conf
config.mistral_config = CONF
logging.register_options(CONF)
CONF(project='mistral')
logging.setup(CONF, 'Mistral')
monitoring = monitoring_server.MonitoringServer()
monitoring.start()
if __name__ == '__main__':
main()

View File

@ -0,0 +1,113 @@
# Copyright 2023 - NetCracker Technology Corp.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import datetime
from flask import Flask
from flask import jsonify
from flask import Response
from importlib_metadata import entry_points
from mistral.service import base as service_base
from mistral_extra.monitoring.prometheus import format_to_prometheus
from oslo_config import cfg
from oslo_log import log as logging
CONF = cfg.CONF
LOG = logging.getLogger(__name__)
def get_oslo_service(setup_profiler=True):
return MonitoringServer(
setup_profiler=setup_profiler
)
class MonitoringServer(service_base.MistralService):
def __init__(self, setup_profiler=True):
super(MonitoringServer, self).__init__(
'monitoring_group',
setup_profiler
)
collectors = entry_points(group='monitoring.metric_collector')
self._metric_collectors = [collector.load()()
for collector in collectors]
self._jobs = []
self._standard_tags = {}
self._metrics = {}
self._prometheus_formatted_metrics = []
self._last_updated = None
self._timedelta = datetime.timedelta(
seconds=CONF.monitoring.metric_collection_interval
)
self.app = Flask(__name__)
self.app.add_url_rule('/metrics', 'metrics', self.metrics)
self.app.add_url_rule('/health', 'health', self.health)
def collect_metrics(self, to_json=False):
now = datetime.datetime.now()
if not self._last_updated or self._outdated(now):
metrics = []
for collector in self._metric_collectors:
metrics.extend(collector.collect())
for metric in metrics:
metric.tags.update(self._standard_tags)
self._metrics = metrics
self._last_updated = now
if to_json:
return list(map(lambda x: x.__dict__, self._metrics))
return self._metrics
def _outdated(self, now):
return self._last_updated <= now - self._timedelta
def _get_prometheus_metrics(self):
metrics = self.collect_metrics(to_json=True)
pr_metrics = format_to_prometheus(metrics)
return ''.join([line.decode('utf-8') for line in pr_metrics])
def metrics(self):
with self.app.app_context():
m = self._get_prometheus_metrics()
return Response(m, 200, content_type='text/plain')
def health(self):
with self.app.app_context():
return jsonify({'status': 'UP'})
def _init_monitoring_jobs(self):
if CONF.recovery_job.enabled:
recovery_jobs = entry_points(group='monitoring.recovery_jobs')
for job in recovery_jobs:
recovery_job = job.load()()
self._jobs.append(recovery_job)
recovery_job.start()
def start(self):
super(MonitoringServer, self).start()
self._init_monitoring_jobs()
self.app.run(host="0.0.0.0", port=9090)
def stop(self, graceful=False):
super(MonitoringServer, self).stop()
for job in self._jobs:
job.stop(graceful)

View File

@ -0,0 +1,58 @@
# Copyright 2023 - NetCracker Technology Corp.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import prometheus_client
from prometheus_client import Gauge
from oslo_log import log as logging
LOG = logging.getLogger(__name__)
_GRAPH = {}
def format_to_prometheus(metrics_list):
for metric in metrics_list:
metric_tags = metric['tags']
metric_name = metric_tags.get('name')
if not metric_name:
LOG.error('Unable to parse metric')
continue
metric_labels = metric_tags.get('labels')
metric_fields = metric['fields']
metric_value = metric_fields['value']
metric_label_values = {
label: metric_fields[label]
for label in metric_labels if label != 'namespace'
}
if 'namespace' in metric_labels:
metric_label_values['namespace'] = metric_tags.get('namespace')
if not _GRAPH.get(metric_name):
_GRAPH[metric_name] = Gauge(
metric_name,
metric_tags.get('description'),
metric_labels
)
_GRAPH[metric_name].labels(**metric_label_values).set(metric_value)
res = []
for key in _GRAPH:
res.append(prometheus_client.generate_latest(_GRAPH[key]))
return res

View File

@ -36,3 +36,9 @@ python-zunclient>=3.4.0 # Apache-2.0
oauthlib>=0.6.2 # BSD
yaql>=1.1.3 # Apache-2.0
keystoneauth1>=3.18.0 # Apache-2.0
# monitoring dependencies
alembic>=1.7.7 # MIT
oslo.config>=8.7.0 # Apache-2.0
Flask>=1.0.2
prometheus_client>=0.15.0

View File

@ -25,10 +25,23 @@ packages =
[entry_points]
mistral.generators =
generators = mistral_extra.actions.generator_factory:all_generators
console_scripts =
mistral-monitoring = mistral_extra.monitoring.launch:main
mistral.preinstalled_workflows =
workflows = mistral_extra.workflows:get_preinstalled_workflows
monitoring.metric_collector =
mistral = mistral_extra.monitoring.collectors.mistral_collector:MistralMetricCollector
monitoring.recovery_jobs =
delayed_calls = mistral_extra.monitoring.jobs.delayed_calls_recovery:DelayedCallsRecoveryJob
idle_tasks = mistral_extra.monitoring.jobs.idle_tasks_recovery:IdleTasksRecoveryJob
named_locks = mistral_extra.monitoring.jobs.named_locks_recovery:NamedLocksRecoveryJob
waiting_tasks = mistral_extra.monitoring.jobs.waiting_tasks_recovery:WaitingTasksRecoveryJob
sub_workflows_start = mistral_extra.monitoring.jobs.subworkflow_start_recovery:SubworkflowStartRecoveryJob
sub_workflow_complete = mistral_extra.monitoring.jobs.subworkflow_complete_recovery:SubworkflowCompleteRecoveryJob
[build_sphinx]
source-dir = doc/source
build-dir = doc/build