Separation of Evaluator and InitializationStatus from ConsistencyEnforcer

Refactoring before parallel evaluator, evaluator is currently held by consistency to be used during the initialization step.
Initialization code moved to InitializationStatus and init thread is now started from the VitrageGraphService.

Change-Id: I551b5d863da0e875b08d3f45754d4be64707fc0a
This commit is contained in:
Idan Hefetz 2017-07-03 15:45:46 +00:00
parent 9c30206251
commit e2b964183c
14 changed files with 190 additions and 170 deletions

View File

@ -18,14 +18,13 @@ import sys
from oslo_service import service as os_service
from vitrage.api_handler import service as api_handler_svc
from vitrage.api_handler.service import VitrageApiHandlerService
from vitrage.common.constants import EntityCategory
from vitrage.datasources import OPENSTACK_CLUSTER
from vitrage.datasources.transformer_base import CLUSTER_ID
from vitrage import entity_graph
from vitrage.entity_graph.consistency import service as consistency_svc
from vitrage.entity_graph.initialization_status import InitializationStatus
from vitrage.entity_graph import service as entity_graph_svc
from vitrage.entity_graph.consistency.service import VitrageConsistencyService
from vitrage.entity_graph.service import VitrageGraphService
from vitrage.evaluator.scenario_evaluator import ScenarioEvaluator
from vitrage.evaluator.scenario_repository import ScenarioRepository
from vitrage import service
@ -41,18 +40,17 @@ def main():
"""
conf = service.prepare_service()
init_status = InitializationStatus()
evaluator_queue, evaluator, e_graph = init(conf)
launcher = os_service.ServiceLauncher(conf)
launcher.launch_service(entity_graph_svc.VitrageGraphService(
conf, evaluator_queue, evaluator, e_graph, init_status))
graph_svc = VitrageGraphService(conf, evaluator_queue, e_graph, evaluator)
launcher.launch_service(graph_svc)
launcher.launch_service(api_handler_svc.VitrageApiHandlerService(
conf, e_graph, evaluator.scenario_repo))
launcher.launch_service(VitrageApiHandlerService(conf, e_graph,
evaluator.scenario_repo))
launcher.launch_service(consistency_svc.VitrageGraphConsistencyService(
conf, evaluator_queue, evaluator, e_graph, init_status))
launcher.launch_service(VitrageConsistencyService(conf, evaluator_queue,
e_graph))
launcher.wait()

View File

@ -36,43 +36,10 @@ class ConsistencyEnforcer(object):
def __init__(self,
conf,
evaluator_queue,
evaluator,
entity_graph,
initialization_status):
entity_graph):
self.conf = conf
self.evaluator_queue = evaluator_queue
self.evaluator = evaluator
self.graph = entity_graph
self.initialization_status = initialization_status
def initializing_process(self):
try:
LOG.info('Consistency Initializing Process - Started')
if not self._wait_for_action(
self.initialization_status.is_received_all_end_messages):
LOG.error('Maximum retries for consistency initializator '
'were done')
LOG.info('All end messages were received')
self.evaluator.enabled = True
timestamp = str(utcnow())
all_vertices = self.graph.get_vertices()
self._run_evaluator(all_vertices)
self._wait_for_processing_evaluator_events()
self._mark_old_deduced_alarms_as_deleted(timestamp)
self.initialization_status.status = \
self.initialization_status.FINISHED
LOG.info('Consistency Initializing Process - Finished')
except Exception as e:
LOG.exception('Error in deleting vertices from entity_graph: %s',
e)
def periodic_process(self):
try:
@ -125,34 +92,6 @@ class ConsistencyEnforcer(object):
return self._filter_vertices_to_be_deleted(vertices)
def _find_old_deduced_alarms(self, timestamp):
query = {
'and': [
{'==': {VProps.VITRAGE_CATEGORY: EntityCategory.ALARM}},
{'==': {VProps.VITRAGE_TYPE: VITRAGE_DATASOURCE}},
{'<': {VProps.VITRAGE_SAMPLE_TIMESTAMP: timestamp}}
]
}
return self.graph.get_vertices(query_dict=query)
def _run_evaluator(self, vertices):
start_time = time.time()
for vertex in vertices:
self.evaluator.process_event(None, vertex, True)
LOG.info('Run Evaluator on %s items - took %s', str(len(vertices)),
str(time.time() - start_time))
def _wait_for_processing_evaluator_events(self):
# wait for multiprocessing to put the events in the queue
time.sleep(1)
self._wait_for_action(self.evaluator_queue.empty)
def _mark_old_deduced_alarms_as_deleted(self, timestamp):
old_deduced_alarms = self._find_old_deduced_alarms(timestamp)
self._push_events_to_queue(old_deduced_alarms,
GraphAction.DELETE_ENTITY)
def _push_events_to_queue(self, vertices, action):
for vertex in vertices:
event = {

View File

@ -14,7 +14,6 @@
from oslo_log import log
from oslo_service import service as os_service
import threading
from vitrage.entity_graph.consistency.consistency_enforcer \
import ConsistencyEnforcer
@ -22,45 +21,35 @@ from vitrage.entity_graph.consistency.consistency_enforcer \
LOG = log.getLogger(__name__)
class VitrageGraphConsistencyService(os_service.Service):
class VitrageConsistencyService(os_service.Service):
def __init__(self,
conf,
evaluator_queue,
evaluator,
entity_graph,
initialization_status):
super(VitrageGraphConsistencyService, self).__init__()
entity_graph):
super(VitrageConsistencyService, self).__init__()
self.conf = conf
self.evaluator_queue = evaluator_queue
self.evaluator = evaluator
self.entity_graph = entity_graph
self.initialization_status = initialization_status
def start(self):
LOG.info("Vitrage Graph Consistency Service - Starting...")
LOG.info("Vitrage Consistency Service - Starting...")
super(VitrageGraphConsistencyService, self).start()
super(VitrageConsistencyService, self).start()
consistency_enf = ConsistencyEnforcer(self.conf,
self.evaluator_queue,
self.evaluator,
self.entity_graph,
self.initialization_status)
self.entity_graph)
self.tg.add_timer(self.conf.datasources.snapshots_interval,
consistency_enf.periodic_process,
initial_delay=60 +
self.conf.datasources.snapshots_interval)
initializing_process_thread = \
threading.Thread(target=consistency_enf.initializing_process)
initializing_process_thread.start()
LOG.info("Vitrage Graph Consistency Service - Started!")
LOG.info("Vitrage Consistency Service - Started!")
def stop(self, graceful=False):
LOG.info("Vitrage Graph Consistency Service - Stopping...")
LOG.info("Vitrage Consistency Service - Stopping...")
super(VitrageGraphConsistencyService, self).stop()
super(VitrageConsistencyService, self).stop()
LOG.info("Vitrage Graph Consistency Service - Stopped!")
LOG.info("Vitrage Consistency Service - Stopped!")

View File

@ -1,29 +0,0 @@
# Copyright 2016 - Nokia
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
class InitializationStatus(object):
STARTED = 'started'
RECEIVED_ALL_END_MESSAGES = 'received_all_end_messages'
FINISHED = 'finished'
def __init__(self):
self.status = self.STARTED
self.end_messages = {}
def is_initialization_finished(self):
return self.status == self.FINISHED
def is_received_all_end_messages(self):
return self.status == self.RECEIVED_ALL_END_MESSAGES

View File

@ -30,23 +30,20 @@ from vitrage.entity_graph.processor.notifier import GraphNotifier
from vitrage.entity_graph.processor import processor_utils as PUtils
from vitrage.entity_graph.transformer_manager import TransformerManager
from vitrage.graph import Direction
from vitrage.graph.driver.networkx_graph import NXGraph
LOG = log.getLogger(__name__)
class Processor(processor.ProcessorBase):
def __init__(self, conf, initialization_status, e_graph=None,
uuid=False):
def __init__(self, conf, initialization_status, e_graph):
super(Processor, self).__init__()
self.conf = conf
self.transformer_manager = TransformerManager(self.conf)
self.state_manager = DatasourceInfoMapper(self.conf)
self._initialize_events_actions()
self.initialization_status = initialization_status
self.entity_graph = e_graph if e_graph is not None\
else NXGraph("Entity Graph", uuid=uuid)
self.entity_graph = e_graph
self._notifier = GraphNotifier(conf)
def process_event(self, event):
@ -221,17 +218,10 @@ class Processor(processor.ProcessorBase):
vertex, graph_vertex)
def handle_end_message(self, vertex, neighbors):
self.initialization_status.end_messages[vertex[VProps.VITRAGE_TYPE]] \
= True
self.initialization_status.handle_end_message(vertex)
if len(self.initialization_status.end_messages) == \
len(self.conf.datasources.types):
self.initialization_status.status = \
self.initialization_status.RECEIVED_ALL_END_MESSAGES
self.do_on_initialization_end()
def do_on_initialization_end(self):
if self._notifier.enabled:
def on_recieved_all_end_messages(self):
if self._notifier and self._notifier.enabled:
self.entity_graph.subscribe(self._notifier.notify_when_applicable)
LOG.info('Graph notifications subscription added')

View File

@ -19,6 +19,7 @@ import oslo_messaging
from oslo_service import service as os_service
from vitrage.entity_graph.processor import processor as proc
from vitrage.entity_graph.vitrage_init import VitrageInit
from vitrage import messaging
LOG = log.getLogger(__name__)
@ -29,16 +30,18 @@ class VitrageGraphService(os_service.Service):
def __init__(self,
conf,
evaluator_queue,
evaluator,
entity_graph,
initialization_status):
evaluator):
super(VitrageGraphService, self).__init__()
self.conf = conf
self.evaluator = evaluator
self.processor = proc.Processor(self.conf,
initialization_status,
e_graph=entity_graph)
self.evaluator_queue = evaluator_queue
self.graph = entity_graph
self.evaluator = evaluator
self.init = VitrageInit(conf, entity_graph, evaluator,
evaluator_queue)
self.processor = proc.Processor(self.conf,
self.init,
e_graph=entity_graph)
self.listener = self._create_datasources_event_listener(conf)
def start(self):
@ -46,6 +49,9 @@ class VitrageGraphService(os_service.Service):
super(VitrageGraphService, self).start()
self.tg.add_timer(0.1, self._process_event_non_blocking)
self.tg.add_thread(
self.init.initializing_process,
on_end_messages_func=self.processor.on_recieved_all_end_messages)
self.listener.start()
LOG.info("Vitrage Graph Service - Started!")

View File

@ -0,0 +1,115 @@
# Copyright 2016 - Nokia
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from oslo_log import log
import time
from vitrage.common.constants import DatasourceAction
from vitrage.common.constants import DatasourceProperties as DSProps
from vitrage.common.constants import EntityCategory
from vitrage.common.constants import GraphAction
from vitrage.common.constants import VertexProperties as VProps
from vitrage.datasources.consistency import CONSISTENCY_DATASOURCE
from vitrage.utils.datetime import utcnow
LOG = log.getLogger(__name__)
class VitrageInit(object):
STARTED = 'started'
RECEIVED_ALL_END_MESSAGES = 'received_all_end_messages'
FINISHED = 'finished'
def __init__(self, conf, graph=None, evaluator=None, evaluator_queue=None):
self.conf = conf
self.graph = graph
self.evaluator = evaluator
self.evaluator_queue = evaluator_queue
self.status = self.STARTED
self.end_messages = {}
def initializing_process(self, on_end_messages_func):
try:
LOG.info('Init Started')
if not self._wait_for_all_end_messages():
LOG.error('Initialization - max retries reached')
else:
LOG.info('Initialization - All end messages were received')
on_end_messages_func()
timestamp = str(utcnow())
self.evaluator.run_evaluator()
if not self._wait_for_action(self.evaluator_queue.empty):
LOG.error('Evaluator Queue Not Empty')
self._mark_old_deduced_alarms_as_deleted(timestamp, self.graph,
self.evaluator_queue)
self.status = self.FINISHED
LOG.info('Init Finished')
except Exception as e:
LOG.exception('Init Failed: %s', e)
def handle_end_message(self, vertex):
self.end_messages[vertex[VProps.VITRAGE_TYPE]] = True
if len(self.end_messages) == len(self.conf.datasources.types):
self.status = self.RECEIVED_ALL_END_MESSAGES
def _wait_for_all_end_messages(self):
return self._wait_for_action(
lambda: self.status == self.RECEIVED_ALL_END_MESSAGES)
def _wait_for_action(self, function):
count_retries = 0
while True:
if count_retries >= \
self.conf.consistency.initialization_max_retries:
return False
if function():
return True
count_retries += 1
time.sleep(self.conf.consistency.initialization_interval)
def _mark_old_deduced_alarms_as_deleted(self, timestamp, graph, out_queue):
query = {
'and': [
{'==': {VProps.VITRAGE_CATEGORY: EntityCategory.ALARM}},
{'==': {VProps.VITRAGE_TYPE: VProps.VITRAGE_TYPE}},
{'<': {VProps.VITRAGE_SAMPLE_TIMESTAMP: timestamp}}
]
}
old_deduced_alarms = graph.get_vertices(query_dict=query)
self._push_events_to_queue(old_deduced_alarms,
GraphAction.DELETE_ENTITY,
out_queue)
def _push_events_to_queue(self, vertices, action, out_queue):
for vertex in vertices:
event = {
DSProps.ENTITY_TYPE: CONSISTENCY_DATASOURCE,
DSProps.DATASOURCE_ACTION: DatasourceAction.UPDATE,
DSProps.SAMPLE_DATE: str(utcnow()),
DSProps.EVENT_TYPE: action,
VProps.VITRAGE_ID: vertex[VProps.VITRAGE_ID],
VProps.ID: vertex.get(VProps.ID, None),
VProps.VITRAGE_TYPE: vertex[VProps.VITRAGE_TYPE],
VProps.VITRAGE_CATEGORY: vertex[VProps.VITRAGE_CATEGORY],
VProps.IS_REAL_VITRAGE_ID: True
}
out_queue.put(event)

View File

@ -13,12 +13,13 @@
# under the License.
from collections import namedtuple
import time
from oslo_log import log
from vitrage.datasources.listener_service import defaultdict
from vitrage.common.constants import EdgeProperties as EProps
from vitrage.common.constants import VertexProperties as VProps
from vitrage.datasources.listener_service import defaultdict
from vitrage.entity_graph.mappings.datasource_info_mapper \
import DatasourceInfoMapper
from vitrage.evaluator.actions.action_executor import ActionExecutor
@ -69,6 +70,15 @@ class ScenarioEvaluator(object):
def scenario_repo(self, scenario_repo):
self._scenario_repo = scenario_repo
def run_evaluator(self):
self.enabled = True
vertices = self._entity_graph.get_vertices()
start_time = time.time()
for vertex in vertices:
self.process_event(None, vertex, True)
LOG.info('Run Evaluator on %s items - took %s', str(len(vertices)),
str(time.time() - start_time))
def process_event(self, before, current, is_vertex, *args, **kwargs):
"""Notification of a change in the entity graph.

View File

@ -14,8 +14,6 @@
from vitrage.common.constants import DatasourceAction
from vitrage.common.constants import DatasourceProperties as DSProps
from vitrage.entity_graph.initialization_status import InitializationStatus
from vitrage.entity_graph.processor import processor as proc
from vitrage.tests.mocks import mock_driver
from vitrage.tests.unit.entity_graph.base import TestEntityGraphUnitBase
@ -27,8 +25,7 @@ class TestFunctionalBase(TestEntityGraphUnitBase):
events = self._create_mock_events()
if not processor:
processor = proc.Processor(conf, InitializationStatus(),
uuid=uuid)
processor = self.create_processor_and_graph(conf, uuid=uuid)
for event in events:
processor.process_event(event)

View File

@ -28,12 +28,13 @@ from vitrage.datasources.nova.instance import NOVA_INSTANCE_DATASOURCE
from vitrage.datasources.nova.zone import NOVA_ZONE_DATASOURCE
from vitrage.entity_graph.consistency.consistency_enforcer \
import ConsistencyEnforcer
from vitrage.entity_graph.initialization_status import InitializationStatus
from vitrage.entity_graph.processor.processor import Processor
from vitrage.entity_graph.vitrage_init import VitrageInit
from vitrage.evaluator.actions.evaluator_event_transformer \
import VITRAGE_DATASOURCE
from vitrage.evaluator.scenario_evaluator import ScenarioEvaluator
from vitrage.evaluator.scenario_repository import ScenarioRepository
from vitrage.graph.driver.networkx_graph import NXGraph
import vitrage.graph.utils as graph_utils
from vitrage.tests.functional.base import TestFunctionalBase
from vitrage.tests.mocks import utils
@ -70,7 +71,6 @@ class TestConsistencyFunctional(TestFunctionalBase):
@classmethod
def setUpClass(cls):
super(TestConsistencyFunctional, cls).setUpClass()
cls.initialization_status = InitializationStatus()
cls.conf = cfg.ConfigOpts()
cls.conf.register_opts(cls.CONSISTENCY_OPTS, group='consistency')
cls.conf.register_opts(cls.PROCESSOR_OPTS, group='entity_graph')
@ -78,8 +78,11 @@ class TestConsistencyFunctional(TestFunctionalBase):
cls.conf.register_opts(cls.DATASOURCES_OPTS, group='datasources')
cls.load_datasources(cls.conf)
cls.graph = NXGraph("Entity Graph", uuid=True)
cls.initialization_status = VitrageInit(cls.conf, cls.graph)
cls.processor = Processor(cls.conf, cls.initialization_status,
uuid=True)
cls.graph)
cls.event_queue = queue.Queue()
scenario_repo = ScenarioRepository(cls.conf)
cls.evaluator = ScenarioEvaluator(cls.conf,
@ -89,9 +92,7 @@ class TestConsistencyFunctional(TestFunctionalBase):
cls.consistency_enforcer = ConsistencyEnforcer(
cls.conf,
cls.event_queue,
cls.evaluator,
cls.processor.entity_graph,
cls.initialization_status)
cls.processor.entity_graph)
@unittest.skip("test_initializing_process skipping")
def test_initializing_process(self):
@ -110,7 +111,8 @@ class TestConsistencyFunctional(TestFunctionalBase):
# eventlet.spawn(self._process_events)
# processor_thread = threading.Thread(target=self._process_events)
# processor_thread.start()
self.consistency_enforcer.initializing_process()
self.initialization_status.initializing_process(
self.processor.on_recieved_all_end_messages)
self._process_events()
# Test Assertions

View File

@ -17,10 +17,8 @@ from oslo_config import cfg
from vitrage.common.constants import DatasourceAction as DSAction
from vitrage.common.constants import GraphAction
from vitrage.common.constants import VertexProperties as VProps
from vitrage.entity_graph.initialization_status import InitializationStatus
from vitrage.entity_graph.mappings.operational_resource_state import \
OperationalResourceState
from vitrage.entity_graph.processor import processor as proc
from vitrage.tests.functional.base import TestFunctionalBase
@ -37,8 +35,7 @@ class TestDatasourceInfoMapperFunctional(TestFunctionalBase):
def test_state_on_update(self):
# setup
processor = proc.Processor(self.conf, InitializationStatus(),
uuid=True)
processor = self.create_processor_and_graph(self.conf, uuid=True)
event = self._create_event(spec_type='INSTANCE_SPEC',
datasource_action=DSAction.INIT_SNAPSHOT)

View File

@ -24,8 +24,9 @@ from vitrage.datasources.neutron.port import NEUTRON_PORT_DATASOURCE
from vitrage.datasources.nova.host import NOVA_HOST_DATASOURCE
from vitrage.datasources.nova.instance import NOVA_INSTANCE_DATASOURCE
from vitrage.datasources.nova.zone import NOVA_ZONE_DATASOURCE
from vitrage.entity_graph.initialization_status import InitializationStatus
from vitrage.entity_graph.processor import processor as proc
from vitrage.entity_graph.vitrage_init import VitrageInit
from vitrage.graph.driver.networkx_graph import NXGraph
import vitrage.graph.utils as graph_utils
from vitrage.opts import register_opts
from vitrage.tests import base
@ -74,7 +75,7 @@ class TestEntityGraphUnitBase(base.BaseTest):
events = self._create_mock_events()
if not processor:
processor = proc.Processor(conf, InitializationStatus())
processor = self.create_processor_and_graph(conf, uuid=False)
for event in events:
processor.process_event(event)
@ -116,8 +117,7 @@ class TestEntityGraphUnitBase(base.BaseTest):
# add instance entity with host
if processor is None:
processor = proc.Processor(self.conf, InitializationStatus(),
uuid=True)
processor = self.create_processor_and_graph(self.conf, True)
vertex, neighbors, event_type = processor.transformer_manager\
.transform(event)
@ -125,6 +125,12 @@ class TestEntityGraphUnitBase(base.BaseTest):
return vertex, neighbors, processor
@staticmethod
def create_processor_and_graph(conf, uuid):
e_graph = NXGraph("Entity Graph", uuid=uuid)
init = VitrageInit(conf)
return proc.Processor(conf, init, e_graph)
@staticmethod
def _create_event(spec_type=None,
datasource_action=None,

View File

@ -20,10 +20,8 @@ from vitrage.common.constants import EdgeProperties as EProps
from vitrage.common.constants import GraphAction
from vitrage.common.constants import VertexProperties as VProps
from vitrage.datasources.transformer_base import Neighbor
from vitrage.entity_graph.initialization_status import InitializationStatus
from vitrage.entity_graph.mappings.operational_resource_state import \
OperationalResourceState
from vitrage.entity_graph.processor import processor as proc
from vitrage.entity_graph.processor import processor_utils as PUtils
import vitrage.graph.utils as graph_utils
from vitrage.tests.unit.entity_graph.base import TestEntityGraphUnitBase
@ -51,8 +49,7 @@ class TestProcessor(TestEntityGraphUnitBase):
def test_process_event(self):
# check create instance event
processor = proc.Processor(self.conf, InitializationStatus(),
uuid=True)
processor = self.create_processor_and_graph(self.conf, uuid=True)
event = self._create_event(spec_type=self.INSTANCE_SPEC,
datasource_action=DSAction.INIT_SNAPSHOT)
processor.process_event(event)

View File

@ -19,8 +19,9 @@ from oslo_config import cfg
from vitrage.common.constants import DatasourceAction
from vitrage.common.constants import DatasourceProperties
from vitrage.entity_graph.initialization_status import InitializationStatus
from vitrage.entity_graph.processor import processor as proc
from vitrage.entity_graph.vitrage_init import VitrageInit
from vitrage.graph.driver.networkx_graph import NXGraph
from vitrage.tests.mocks import mock_driver as mock_sync
from vitrage.tests.mocks import utils
@ -37,7 +38,9 @@ class BaseMock(testtools.TestCase):
conf = cfg.ConfigOpts()
conf.register_opts(self.PROCESSOR_OPTS, group='entity_graph')
events = self._create_mock_events()
processor = proc.Processor(conf, InitializationStatus())
e_graph = NXGraph("Entity Graph", uuid=False)
init = VitrageInit(conf)
processor = proc.Processor(conf, init, e_graph)
for event in events:
processor.process_event(event)