Add logic to stop repair scripts
When watchdog detects that repair script(s) have been killed, get a list of scripts to nuke and pass to stop_repair_scripts. Then, get its routing key(s), and send a message from a special user to any queue listening on those keys. Modified an example repair script to show how it could be killed, but need a more concrete way that that. For now, messages from 'react_killer' will raise the RepairStoppedException, which will stop react scripts Modified the example engine cfg to have some details about the kombu connection to use. Implements blueprint kill-repair-scripts Change-Id: I67e15e9b9ebb5d36c5cb0e01995bc95f7a73b3dd
This commit is contained in:
parent
0173ef0b53
commit
5b647e5f79
|
@ -54,6 +54,8 @@ def _add_to_list(engine, script_type, script_name, **script_args):
|
|||
}
|
||||
backend.add_script(script_type, data)
|
||||
return True
|
||||
except KeyError:
|
||||
LOG.exception("No %s script called %s", script_type, script_name)
|
||||
except Exception:
|
||||
LOG.exception("Could not register %s script %s", script_type,
|
||||
script_name)
|
||||
|
|
|
@ -24,7 +24,10 @@ import tempfile
|
|||
|
||||
from concurrent import futures as cf
|
||||
import croniter
|
||||
from kombu import BrokerConnection
|
||||
from kombu.common import maybe_declare
|
||||
from kombu import Exchange
|
||||
from kombu.pools import producers
|
||||
from kombu import Queue
|
||||
import pause
|
||||
import six
|
||||
|
@ -84,6 +87,14 @@ class Engine(object):
|
|||
# State related variables
|
||||
self._state = states.ENABLED
|
||||
|
||||
# Variables for mq.
|
||||
self._mq_args = {
|
||||
'mq_user': cfg_data['mq_user'],
|
||||
'mq_password': cfg_data['mq_password'],
|
||||
'mq_host': cfg_data['mq_host'],
|
||||
'mq_port': cfg_data['mq_port']
|
||||
}
|
||||
|
||||
LOG.info('Created engine obj %s', self.name)
|
||||
|
||||
# TODO(praneshp): Move to utils?
|
||||
|
@ -249,7 +260,10 @@ class Engine(object):
|
|||
repairs_to_delete.append(repair)
|
||||
LOG.info('Will add new repairs: %s', new_repairs)
|
||||
LOG.info('Will nuke repairs: %s', repairs_to_delete)
|
||||
self.futures.extend(self.start_react_scripts(new_repairs))
|
||||
if new_repairs:
|
||||
self.futures.extend(self.start_react_scripts(new_repairs))
|
||||
if repairs_to_delete:
|
||||
self.stop_react_scripts(repairs_to_delete)
|
||||
|
||||
def start_watchdog(self):
|
||||
LOG.debug('Watchdog mapping is: ', self._watchdog_event_fn)
|
||||
|
@ -286,6 +300,45 @@ class Engine(object):
|
|||
repairs = self._backend_driver.get_repairs()
|
||||
return repairs
|
||||
|
||||
def stop_react_scripts(self, repairs_to_stop):
|
||||
# current react scripts
|
||||
LOG.info("Currently running react scripts: %s", self._repairs)
|
||||
for repair in repairs_to_stop:
|
||||
self.stop_react(repair)
|
||||
# react scripts at the end
|
||||
LOG.info("Currently running react scripts: %s", self._repairs)
|
||||
|
||||
def stop_react(self, repair):
|
||||
LOG.info("Stopping react script %s", repair)
|
||||
# Get what the keywords are
|
||||
routing_key = self._known_routing_keys[repair]
|
||||
# remove the repair script from our known set.
|
||||
self._known_routing_keys.pop(repair)
|
||||
# put out a special message, repair script will see that and die.
|
||||
self._send_killer_message(routing_key)
|
||||
LOG.info("Stopped react script %s", repair)
|
||||
|
||||
def _send_killer_message(self, routing_key):
|
||||
# NOTE(praneshp): routing_key is a list
|
||||
# TODO(praneshp): we'll figure out a way to do this better.
|
||||
connection = BrokerConnection('amqp://%(mq_user)s:%(mq_password)s@'
|
||||
'%(mq_host)s:%(mq_port)s//' %
|
||||
self._mq_args)
|
||||
message = {'From': 'repair_killer',
|
||||
'Date': str(datetime.datetime.now().isoformat())}
|
||||
|
||||
with producers[connection].acquire(block=True) as producer:
|
||||
try:
|
||||
maybe_declare(self.entropy_exchange, producer.channel)
|
||||
for rk in routing_key:
|
||||
producer.publish(message,
|
||||
exchange=self.entropy_exchange,
|
||||
routing_key=rk,
|
||||
serializer='json')
|
||||
LOG.debug("React killer published message")
|
||||
except Exception:
|
||||
LOG.exception("React killer could not send message")
|
||||
|
||||
def start_react_scripts(self, repairs):
|
||||
futures = []
|
||||
if repairs:
|
||||
|
|
|
@ -6,3 +6,7 @@ test:
|
|||
serializer_schedule: "*/2 * * * *"
|
||||
engine_timeout: "25"
|
||||
backend: file
|
||||
mq_host: "localhost"
|
||||
mq_port: "5672"
|
||||
mq_user: "guest"
|
||||
mq_password: "guest"
|
||||
|
|
|
@ -17,6 +17,8 @@ import logging
|
|||
from kombu import BrokerConnection
|
||||
from kombu.mixins import ConsumerMixin
|
||||
|
||||
from entropy import exceptions
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
|
||||
|
@ -33,6 +35,8 @@ class SomeConsumer(ConsumerMixin):
|
|||
def on_message(self, body, message):
|
||||
LOG.warning("React script %s received message: %r", self.name, body)
|
||||
message.ack()
|
||||
if body['From'] == 'repair_killer':
|
||||
raise exceptions.RepairStopException
|
||||
return
|
||||
|
||||
|
||||
|
@ -42,7 +46,7 @@ def receive_message(**kwargs):
|
|||
with connection as conn:
|
||||
try:
|
||||
SomeConsumer(conn, **kwargs).run()
|
||||
except KeyboardInterrupt:
|
||||
except (KeyboardInterrupt, exceptions.RepairStopException):
|
||||
LOG.warning('Quitting %s' % __name__)
|
||||
|
||||
|
||||
|
|
|
@ -49,3 +49,7 @@ class NoEnginesException(EntropyException):
|
|||
|
||||
class SerializerException(EntropyException):
|
||||
"""Exception raised when the serializer fails."""
|
||||
|
||||
|
||||
class RepairStopException(EntropyException):
|
||||
"""Exception raised when repair scripts should be stopped."""
|
||||
|
|
Loading…
Reference in New Issue