diff --git a/entropy/__main__.py b/entropy/__main__.py index a4c7bcd..cf75100 100644 --- a/entropy/__main__.py +++ b/entropy/__main__.py @@ -54,6 +54,8 @@ def _add_to_list(engine, script_type, script_name, **script_args): } backend.add_script(script_type, data) return True + except KeyError: + LOG.exception("No %s script called %s", script_type, script_name) except Exception: LOG.exception("Could not register %s script %s", script_type, script_name) diff --git a/entropy/engine.py b/entropy/engine.py index 7339469..dd0af86 100644 --- a/entropy/engine.py +++ b/entropy/engine.py @@ -24,7 +24,10 @@ import tempfile from concurrent import futures as cf import croniter +from kombu import BrokerConnection +from kombu.common import maybe_declare from kombu import Exchange +from kombu.pools import producers from kombu import Queue import pause import six @@ -84,6 +87,14 @@ class Engine(object): # State related variables self._state = states.ENABLED + # Variables for mq. + self._mq_args = { + 'mq_user': cfg_data['mq_user'], + 'mq_password': cfg_data['mq_password'], + 'mq_host': cfg_data['mq_host'], + 'mq_port': cfg_data['mq_port'] + } + LOG.info('Created engine obj %s', self.name) # TODO(praneshp): Move to utils? @@ -249,7 +260,10 @@ class Engine(object): repairs_to_delete.append(repair) LOG.info('Will add new repairs: %s', new_repairs) LOG.info('Will nuke repairs: %s', repairs_to_delete) - self.futures.extend(self.start_react_scripts(new_repairs)) + if new_repairs: + self.futures.extend(self.start_react_scripts(new_repairs)) + if repairs_to_delete: + self.stop_react_scripts(repairs_to_delete) def start_watchdog(self): LOG.debug('Watchdog mapping is: ', self._watchdog_event_fn) @@ -286,6 +300,45 @@ class Engine(object): repairs = self._backend_driver.get_repairs() return repairs + def stop_react_scripts(self, repairs_to_stop): + # current react scripts + LOG.info("Currently running react scripts: %s", self._repairs) + for repair in repairs_to_stop: + self.stop_react(repair) + # react scripts at the end + LOG.info("Currently running react scripts: %s", self._repairs) + + def stop_react(self, repair): + LOG.info("Stopping react script %s", repair) + # Get what the keywords are + routing_key = self._known_routing_keys[repair] + # remove the repair script from our known set. + self._known_routing_keys.pop(repair) + # put out a special message, repair script will see that and die. + self._send_killer_message(routing_key) + LOG.info("Stopped react script %s", repair) + + def _send_killer_message(self, routing_key): + # NOTE(praneshp): routing_key is a list + # TODO(praneshp): we'll figure out a way to do this better. + connection = BrokerConnection('amqp://%(mq_user)s:%(mq_password)s@' + '%(mq_host)s:%(mq_port)s//' % + self._mq_args) + message = {'From': 'repair_killer', + 'Date': str(datetime.datetime.now().isoformat())} + + with producers[connection].acquire(block=True) as producer: + try: + maybe_declare(self.entropy_exchange, producer.channel) + for rk in routing_key: + producer.publish(message, + exchange=self.entropy_exchange, + routing_key=rk, + serializer='json') + LOG.debug("React killer published message") + except Exception: + LOG.exception("React killer could not send message") + def start_react_scripts(self, repairs): futures = [] if repairs: diff --git a/entropy/examples/cfg/test.cfg b/entropy/examples/cfg/test.cfg index 9f64cd9..57eac5d 100644 --- a/entropy/examples/cfg/test.cfg +++ b/entropy/examples/cfg/test.cfg @@ -6,3 +6,7 @@ test: serializer_schedule: "*/2 * * * *" engine_timeout: "25" backend: file + mq_host: "localhost" + mq_port: "5672" + mq_user: "guest" + mq_password: "guest" diff --git a/entropy/examples/repair/react.py b/entropy/examples/repair/react.py index 2c52d7d..29e2a8b 100644 --- a/entropy/examples/repair/react.py +++ b/entropy/examples/repair/react.py @@ -17,6 +17,8 @@ import logging from kombu import BrokerConnection from kombu.mixins import ConsumerMixin +from entropy import exceptions + LOG = logging.getLogger(__name__) @@ -33,6 +35,8 @@ class SomeConsumer(ConsumerMixin): def on_message(self, body, message): LOG.warning("React script %s received message: %r", self.name, body) message.ack() + if body['From'] == 'repair_killer': + raise exceptions.RepairStopException return @@ -42,7 +46,7 @@ def receive_message(**kwargs): with connection as conn: try: SomeConsumer(conn, **kwargs).run() - except KeyboardInterrupt: + except (KeyboardInterrupt, exceptions.RepairStopException): LOG.warning('Quitting %s' % __name__) diff --git a/entropy/exceptions.py b/entropy/exceptions.py index 16067e4..590f6f1 100644 --- a/entropy/exceptions.py +++ b/entropy/exceptions.py @@ -49,3 +49,7 @@ class NoEnginesException(EntropyException): class SerializerException(EntropyException): """Exception raised when the serializer fails.""" + + +class RepairStopException(EntropyException): + """Exception raised when repair scripts should be stopped."""