diff --git a/murano/api/v1/environments.py b/murano/api/v1/environments.py index 8a7e89607..6f128ef80 100644 --- a/murano/api/v1/environments.py +++ b/murano/api/v1/environments.py @@ -12,6 +12,8 @@ # License for the specific language governing permissions and limitations # under the License. +import datetime + import jsonpatch from oslo_db import exception as db_exc from oslo_log import log as logging @@ -30,6 +32,7 @@ from murano.db.services import core_services from murano.db.services import environments as envs from murano.db.services import sessions as session_services from murano.db import session as db_session +from murano.engine.system import status_reporter from murano.services import states from murano.utils import check_env from murano.utils import check_session @@ -41,6 +44,11 @@ API_NAME = 'Environments' class Controller(object): + + def __init__(self, *args, **kwargs): + super(Controller, self).__init__(*args, **kwargs) + self._notifier = status_reporter.Notification() + @request_statistics.stats_count(API_NAME, 'Index') def index(self, request): all_tenants = request.GET.get('all_tenants', 'false').lower() == 'true' @@ -161,20 +169,26 @@ class Controller(object): def delete(self, request, environment_id): target = {"environment_id": environment_id} policy.check('delete_environment', request.context, target) + environment = check_env(request, environment_id) + if request.GET.get('abandon', '').lower() == 'true': - check_env(request, environment_id) - LOG.debug('Environments:Abandon ' - .format(id=environment_id)) + LOG.debug( + 'Environments:Abandon '.format(id=environment_id)) envs.EnvironmentServices.remove(environment_id) else: - LOG.debug('Environments:Delete ' - .format(id=environment_id)) + LOG.debug( + 'Environments:Delete '.format(id=environment_id)) sessions_controller = sessions.Controller() - session = sessions_controller.configure(request, environment_id) + session = sessions_controller.configure( + request, environment_id) session_id = session['id'] envs.EnvironmentServices.delete(environment_id, session_id) sessions_controller.deploy(request, environment_id, session_id) + env = environment.to_dict() + env['deleted'] = datetime.datetime.utcnow() + self._notifier.report('environment.delete.end', env) + @request_statistics.stats_count(API_NAME, 'LastStatus') @verify_env def last(self, request, environment_id): diff --git a/murano/common/config.py b/murano/common/config.py index 1c4e492f0..24c019706 100644 --- a/murano/common/config.py +++ b/murano/common/config.py @@ -173,7 +173,12 @@ networking_opts = [ stats_opts = [ cfg.IntOpt('period', default=5, help=_('Statistics collection interval in minutes.' - 'Default value is 5 minutes.')) + 'Default value is 5 minutes.')), + cfg.IntOpt('env_audit_period', default=60, + help=_('Environment audit interval in minutes. ' + 'Default value is 60 minutes.')), + cfg.BoolOpt('env_audit_enabled', default=True, + help=_('Whether environment audit events enabled')) ] engine_opts = [ diff --git a/murano/common/server.py b/murano/common/server.py index f64c7f05a..0bc921624 100644 --- a/murano/common/server.py +++ b/murano/common/server.py @@ -29,6 +29,7 @@ from murano.db import models from murano.db.services import environments from murano.db.services import instances from murano.db import session +from murano.engine.system import status_reporter from murano.services import states CONF = cfg.CONF @@ -124,6 +125,12 @@ class ResultEndpoint(object): .format(env_id=environment.id, tenant_id=environment.tenant_id, services=services)) + if action_name == 'Deployment': + env = environment.to_dict() + env["deployment_started"] = deployment.started + env["deployment_finished"] = deployment.finished + status_reporter.get_notifier().report( + 'environment.deploy.end', env) def notification_endpoint_wrapper(priority='info'): diff --git a/murano/common/statservice.py b/murano/common/statservice.py index 6de14f761..e958fa4cb 100644 --- a/murano/common/statservice.py +++ b/murano/common/statservice.py @@ -23,10 +23,16 @@ from oslo_config import cfg from oslo_log import log as logging from oslo_service import service import psutil +from sqlalchemy import desc from murano.api import v1 +from murano.api.v1.deployments import set_dep_state from murano.api.v1 import request_statistics +from murano.db import models +from murano.db.services import environments as envs from murano.db.services import stats as db_stats +from murano.db import session as db_session +from murano.engine.system import status_reporter CONF = cfg.CONF @@ -41,10 +47,12 @@ class StatsCollectingService(service.Service): self._hostname = socket.gethostname() self._stats_db = db_stats.Statistics() self._prev_time = time.time() + self._notifier = status_reporter.Notification() def start(self): super(StatsCollectingService, self).start() self.tg.add_thread(self._collect_stats_loop) + self.tg.add_thread(self._report_env_stats_loop) def stop(self): super(StatsCollectingService, self).stop() @@ -55,6 +63,12 @@ class StatsCollectingService(service.Service): self.update_stats() eventlet.sleep(period) + def _report_env_stats_loop(self): + env_audit_period = CONF_STATS.env_audit_period * 60 + while True: + self.report_env_stats() + eventlet.sleep(env_audit_period) + def update_stats(self): LOG.debug("Updating statistic information.") LOG.debug("Stats object: {stats}".format(stats=v1.stats)) @@ -95,3 +109,27 @@ class StatsCollectingService(service.Service): except Exception as e: LOG.exception("Failed to get statistics object from a " "database. {error_code}".format(error_code=e)) + + def report_env_stats(self): + LOG.debug("Reporting env stats") + try: + environments = envs.EnvironmentServices.get_environments_by({}) + + for env in environments: + deployments = get_env_deployments(env.id) + success_deployments = [d for d in deployments + if d['state'] == "success"] + if success_deployments: + self._notifier.report('environment.exists', env.to_dict()) + except Exception: + LOG.exception("Failed to report existing envs") + + +def get_env_deployments(environment_id): + unit = db_session.get_session() + query = unit.query(models.Task).filter_by( + environment_id=environment_id).order_by(desc(models.Task.created)) + result = query.all() + deployments = [ + set_dep_state(deployment, unit).to_dict() for deployment in result] + return deployments diff --git a/murano/engine/system/status_reporter.py b/murano/engine/system/status_reporter.py index 89a502988..0b2ced941 100644 --- a/murano/engine/system/status_reporter.py +++ b/murano/engine/system/status_reporter.py @@ -14,8 +14,10 @@ # limitations under the License. from datetime import datetime +import socket from oslo_config import cfg +from oslo_log import log as logging import oslo_messaging as messaging import six @@ -23,6 +25,7 @@ from murano.common import uuidutils from murano.dsl import dsl CONF = cfg.CONF +LOG = logging.getLogger(__name__) @dsl.name('io.murano.system.StatusReporter') @@ -63,3 +66,66 @@ class StatusReporter(object): @dsl.name('report_error') def report_error(self, instance, msg): self._report(instance, msg, None, 'error') + + +class Notification(object): + transport = None + + def __init__(self): + if not CONF.stats.env_audit_enabled: + return + + if Notification.transport is None: + Notification.transport = messaging.get_notification_transport( + CONF) + self._notifier = messaging.Notifier( + Notification.transport, + publisher_id=('murano.%s' % socket.gethostname()), + driver='messaging') + + def _report(self, event_type, environment, level='info'): + if not CONF.stats.env_audit_enabled: + return + + if 'deleted' in environment: + deleted_at = environment['deleted'].isoformat() + else: + deleted_at = None + + body = { + 'id': environment['id'], + 'level': level, + 'environment_id': environment['id'], + 'tenant_id': environment['tenant_id'], + 'created_at': environment.get('created').isoformat(), + 'deleted_at': deleted_at, + 'launched_at': None, + 'timestamp': datetime.utcnow().isoformat() + } + + optional_fields = ("deployment_started", "deployment_finished") + for f in optional_fields: + body[f] = environment.get(f, None) + + LOG.debug("Sending out notification, type=%s, body=%s, level=%s", + event_type, body, level) + + self._notifier.info({}, 'murano.%s' % event_type, + body) + + def report(self, event_type, environment): + self._report(event_type, environment) + + def report_error(self, event_type, environment): + self._report(event_type, environment, 'error') + + +NOTIFIER = None + + +def get_notifier(): + global NOTIFIER + if not NOTIFIER: + NOTIFIER = Notification() + + return NOTIFIER diff --git a/murano/tests/unit/base.py b/murano/tests/unit/base.py index 6855f33cf..3d58dfd21 100644 --- a/murano/tests/unit/base.py +++ b/murano/tests/unit/base.py @@ -41,3 +41,15 @@ class MuranoWithDBTestCase(MuranoTestCase): self.override_config('connection', "sqlite://", group='database') db_api.setup_db() self.addCleanup(db_api.drop_db) + + self.override_config('env_audit_enabled', False, group='stats') + + +class MuranoNotifyWithDBTestCase(MuranoWithDBTestCase): + + def setUp(self): + super(MuranoNotifyWithDBTestCase, self).setUp() + self.override_config('connection', "sqlite://", group='database') + db_api.setup_db() + self.addCleanup(db_api.drop_db) + self.override_config('env_audit_enabled', True, group='stats') diff --git a/murano/tests/unit/common/test_server.py b/murano/tests/unit/common/test_server.py index 7efeece96..45dc35fd5 100644 --- a/murano/tests/unit/common/test_server.py +++ b/murano/tests/unit/common/test_server.py @@ -30,12 +30,13 @@ class ServerTest(base.MuranoTestCase): cls.result_endpoint = server.ResultEndpoint() cls.dummy_context = test_utils.dummy_context() + @mock.patch('murano.common.server.status_reporter.get_notifier') @mock.patch('murano.common.server.LOG') @mock.patch('murano.common.server.get_last_deployment') @mock.patch('murano.common.server.models') @mock.patch('murano.common.server.session') def test_process_result(self, mock_db_session, mock_models, - mock_last_deployment, mock_log): + mock_last_deployment, mock_log, mock_notifier): test_result = { 'model': { 'Objects': { @@ -79,6 +80,9 @@ class ServerTest(base.MuranoTestCase): .format(env_id=mock_env.id, tenant_id=mock_env.tenant_id, services=test_result['model']['Objects']['services'])) + mock_notifier.return_value.report.assert_called_once_with( + 'environment.deploy.end', + mock_db_session.get_session().query().get(mock_env.id).to_dict()) @mock.patch('murano.common.server.LOG') @mock.patch('murano.common.server.get_last_deployment') diff --git a/murano/tests/unit/common/test_statservice.py b/murano/tests/unit/common/test_statservice.py index de90900d9..5d9cab8b9 100644 --- a/murano/tests/unit/common/test_statservice.py +++ b/murano/tests/unit/common/test_statservice.py @@ -13,10 +13,16 @@ # License for the specific language governing permissions and limitations # under the License. +import datetime as dt import mock import time +from oslo_utils import timeutils + from murano.common import statservice +from murano.db import models +from murano.db import session as db_session +from murano.services import states from murano.tests.unit import base @@ -37,7 +43,7 @@ class StatsCollectingServiceTest(base.MuranoTestCase): def test_service_start_and_stop(self, _): self.assertEqual(0, len(self.service.tg.threads)) self.service.start() - self.assertEqual(1, len(self.service.tg.threads)) + self.assertEqual(2, len(self.service.tg.threads)) self.service.stop() self.assertEqual(0, len(self.service.tg.threads)) @@ -93,3 +99,156 @@ class StatsCollectingServiceTest(base.MuranoTestCase): mock_log.exception.assert_called_once_with( "Failed to get statistics object from a " "database. {error_code}".format(error_code='test_error_code')) + + +class EnvReportingTest(base.MuranoNotifyWithDBTestCase): + + def setUp(self): + super(EnvReportingTest, self).setUp() + self.service = statservice.StatsCollectingService() + + @mock.patch('murano.common.statservice.status_reporter.' + 'Notification.report') + def test_report_env_stats(self, mock_notifier): + now = timeutils.utcnow() + later = now + dt.timedelta(minutes=1) + + session = db_session.get_session() + + environment1 = models.Environment( + name='test_environment1', tenant_id='test_tenant_id1', + version=2, id='test_env_id_1', + created=now, + updated=later, + description={ + 'Objects': { + 'applications': ['app1'], + 'services': ['service1'] + } + } + ) + environment2 = models.Environment( + name='test_environment2', tenant_id='test_tenant_id2', + version=1, id='test_env_id_2', + created=now, + updated=later, + description={ + 'Objects': { + 'applications': ['app2'], + 'services': ['service3'] + } + } + ) + environment3 = models.Environment( + name='test_environment3', tenant_id='test_tenant_id2', + version=1, id='test_env_id_3', + created=now, + updated=later, + description={} + ) + + session_1 = models.Session( + environment=environment1, user_id='test_user_id', + description={}, + state=states.SessionState.DEPLOYED, + version=1 + ) + + session_2 = models.Session( + environment=environment2, user_id='test_user_id', + description={}, + state=states.SessionState.DEPLOYED, + version=0 + ) + + session_3 = models.Session( + environment=environment3, user_id='test_user_id', + description={}, + state=states.SessionState.DEPLOY_FAILURE, + version=1 + ) + + task_1 = models.Task( + id='task_id_1', + environment=environment1, + description={}, + created=now, + started=now, + updated=later, + finished=later + ) + + task_2 = models.Task( + id='task_id_2', + environment=environment2, + description={}, + created=now, + started=now, + updated=later, + finished=later + ) + + task_3 = models.Task( + id='task_id_3', + environment=environment3, + description={}, + created=now, + started=now, + updated=later, + finished=later + ) + + status_1 = models.Status( + id='status_id_1', + task_id='task_id_1', + text='Deployed', + level='info' + ) + + status_2 = models.Status( + id='status_id_2', + task_id='task_id_2', + text='Deployed', + level='info' + ) + + status_3 = models.Status( + id='status_id_3', + task_id='task_id_3', + text='Something was wrong', + level='error' + ) + + session.add_all([environment1, environment2, environment3]) + session.add_all([session_1, session_2, session_3]) + session.add_all([task_1, task_2, task_3]) + session.add_all([status_1, status_2, status_3]) + + session.flush() + + self.service.report_env_stats() + + self.assertEqual(mock_notifier.call_count, 2) + + dict_env_1 = {'version': 2, + 'updated': later, + 'tenant_id': u'test_tenant_id1', + 'created': now, + 'description_text': u'', + 'status': 'ready', + 'id': u'test_env_id_1', + 'name': u'test_environment1'} + + dict_env_2 = {'version': 1, + 'updated': later, + 'tenant_id': u'test_tenant_id2', + 'created': now, + 'description_text': u'', + 'status': 'ready', + 'id': u'test_env_id_2', + 'name': u'test_environment2'} + + calls = [mock.call('environment.exists', dict_env_1), + mock.call('environment.exists', dict_env_2)] + + mock_notifier.assert_has_calls(calls) diff --git a/releasenotes/notes/implement-environment-audit-reports-23bb8009d1dfaecc.yaml b/releasenotes/notes/implement-environment-audit-reports-23bb8009d1dfaecc.yaml new file mode 100644 index 000000000..72ff5ad58 --- /dev/null +++ b/releasenotes/notes/implement-environment-audit-reports-23bb8009d1dfaecc.yaml @@ -0,0 +1,11 @@ +--- +features: + - | + + Add notifications about environment events that are required for + tracking. These are AMQP notifications and oslo.messaging library + is used for sending them. + The follow event types are provided: environment.deploy.end, + environment.delete.end, environment.exists + There are 2 new configuration options controlling these notifications: + stats.env_audit_period, env_audit_enabled.