Move code into a scheduler class, part II

Moved all major functionalities, like adding audit/repair scripts,
the watchdog handler, etc to the engine class. There is still some
cleanup to do, like getting rid of some references to the globals
variables, which will come in part III, in progress.

Realized that we need full names to at least the audit.cfg and
repair.cfg files, removing all the cfg files from git, because
it makes the repo look ugly with usernames in them. Will add in
sample cfg files, even though they shouldn't really be created
manually.

Verified that the code is now in the same state as before we used
a scheduler class, ie no difference in expected behavior

Change-Id: If9eeb9201ac6dd30705c3246c304b304054dc577
This commit is contained in:
pran1990 2014-03-16 16:52:01 -07:00
parent bfd98c51a8
commit 3ac2fde405
3 changed files with 171 additions and 153 deletions

View File

@ -17,13 +17,10 @@
import argparse
from concurrent.futures import ThreadPoolExecutor
import datetime
import logging
import os
import sys
import croniter
import pause
import yaml
sys.path.insert(0, os.path.join(os.path.abspath(os.pardir)))
@ -41,35 +38,8 @@ all_futures = []
entropy_engine = None
def run_scheduler(args):
LOG.info('Starting Scheduler')
# Start watchdog thread. If any new audit/react scripts are added,
# detect and add.
# Start react and audit scripts.
all_futures.append(start_scripts('repair'))
all_futures.append(start_scripts('audit'))
def start_scripts(script_type):
if script_type == 'audit':
(running_scripts, setup_func) = (running_audits, setup_audit)
cfg = globals.AUDIT_CFG
elif script_type == 'repair':
(running_scripts, setup_func) = (running_repairs, setup_react)
cfg = globals.REPAIR_CFG
scripts = utils.load_yaml(cfg)
futures = []
for script in scripts:
if script['name'] not in running_scripts:
futures.append(setup_func(script))
LOG.warning('Running %s scripts %s', script_type,
', '.join(running_scripts))
return futures
# TODO(praneshp): for next 3 fns, read the right file from engine name and
# type, then modify that file.
def add_to_list(script_type, **kwargs):
if script_type == 'audit':
cfg_file = globals.AUDIT_CFG
@ -81,99 +51,12 @@ def add_to_list(script_type, **kwargs):
explicit_start=True))
def setup_audit(script):
LOG.warning('Setting up audit script %s', script['name'])
# Now pick out relevant info
data = dict(utils.load_yaml(script['conf']).next())
# stuff for the message queue
mq_args = {'mq_host': data['mq_host'],
'mq_port': data['mq_port'],
'mq_user': data['mq_user'],
'mq_password': data['mq_password']}
# general stuff for the audit module
# TODO(praneshp): later, fix to send only one copy of mq_args
kwargs = data
kwargs['mq_args'] = mq_args
# add this job to list of running audits
running_audits.append(script['name'])
# start a process for this audit script
future = executor.submit(start_audit, **kwargs)
return future
def setup_react(script):
LOG.warning('Setting up reactor %s', script['name'])
# Pick out relevant info
data = dict(utils.load_yaml(script['conf']).next())
react_script = data['script']
available_modules = utils.find_module(react_script, ['repair'])
LOG.info('Found these modules: %s', available_modules)
if not available_modules:
LOG.error('No module to load')
else:
imported_module = utils.import_module(available_modules[0])
kwargs = data
kwargs['conf'] = script['conf']
# add this job to list of running audits
running_repairs.append(script['name'])
future = executor.submit(imported_module.main, **kwargs)
return future
def run_audit(**kwargs):
# Put a message on the mq
#TODO(praneshp): this should be the path with register-audit
#TODO(praneshp): The whole logic in this function should be in
# try except blocks
available_modules = utils.find_module(kwargs['module'], ['audit'])
LOG.info('Found these modules: %s', available_modules)
if not available_modules:
LOG.error('No module to load')
else:
imported_module = utils.import_module(available_modules[0])
audit_obj = imported_module.Audit()
try:
audit_obj.send_message(**kwargs)
except Exception as e:
LOG.error(e)
def start_audit(**kwargs):
LOG.info("Starting audit for %s", kwargs['name'])
now = datetime.datetime.now()
schedule = kwargs['schedule']
cron = croniter.croniter(schedule, now)
next_iteration = cron.get_next(datetime.datetime)
while True:
LOG.warning('Next call at %s', next_iteration)
pause.until(next_iteration)
run_audit(**kwargs)
next_iteration = cron.get_next(datetime.datetime)
# TODO(praneshp) move this to utils
def check_duplicate(name, cfg_file):
scripts = utils.load_yaml(cfg_file)
names = [script['name'] for script in scripts]
if name in names:
return True
return False
def repair_present(name):
return check_duplicate(name, globals.REPAIR_CFG)
return utils.check_duplicate(name, globals.REPAIR_CFG)
def audit_present(name):
return check_duplicate(name, globals.AUDIT_CFG)
return utils.check_duplicate(name, globals.AUDIT_CFG)
def register_audit(args):
@ -181,7 +64,7 @@ def register_audit(args):
LOG.warning('Registering audit script %s', args.name)
#First check if you have all inputs
if not (args.conf or args.name):
if not (args.conf and args.name and args.engine):
LOG.error('Need path to script and json')
return
@ -192,7 +75,7 @@ def register_audit(args):
#Write to audit file
audit_cfg_args = {'name': args.name,
'conf': args.conf}
'conf': os.path.join(os.getcwd(), args.conf)}
add_to_list('audit', **audit_cfg_args)
LOG.info('Registered audit %s', args.name)
@ -202,7 +85,7 @@ def register_repair(args):
LOG.warning('Registering repair script %s', args.name)
#First check if you have all inputs
if not (args.conf or args.name):
if not (args.conf and args.name and args.engine):
LOG.error('Need path to script and json')
return
@ -213,16 +96,28 @@ def register_repair(args):
#Write to audit file
repair_cfg_args = {'name': args.name,
'conf': args.conf}
'conf': os.path.join(os.getcwd(), args.conf)}
add_to_list('repair', **repair_cfg_args)
LOG.info('Registered repair script %s', args.name)
def start_engine(args):
# TODO(praneshp): for now, always look in entropy/cfg for config files.
if not (args.name and args.audit_cfg and args.repair_cfg):
LOG.error('Need name, audit_cfg, and repair_cfg')
return
engine_cfg = os.path.join(os.getcwd(), 'entropy', 'cfg', 'test.cfg')
args.log_file = os.path.join(os.getcwd(), args.log_file)
args.audit_cfg = os.path.join(os.getcwd(), args.audit_cfg)
args.repair_cfg = os.path.join(os.getcwd(), args.repair_cfg)
cfg = {'audit': args.audit_cfg, 'repair': args.repair_cfg}
with open(engine_cfg, "w") as cfg_file:
cfg_file.write(yaml.dump(cfg, canonical=False,
default_flow_style=False,
explicit_start=True))
LOG.info('Wrote to engine cfg')
global entropy_engine
entropy_engine = Engine(args.name)
watchdog_thread = entropy_engine.start_watchdog(globals.CFG_DIR) # noqa
watchdog_thread.join()
entropy_engine = Engine(args)
def parse():
@ -239,6 +134,8 @@ def parse():
action='store', help='Audit script')
register_audit_parser.add_argument('-c', dest='conf', action='store',
help='Audit conf')
register_audit_parser.add_argument('-e', dest='engine', action='store',
help='Engine')
register_audit_parser.set_defaults(func=register_audit)
register_repair_parser =\
@ -248,16 +145,19 @@ def parse():
help='Repair script name')
register_repair_parser.add_argument('-c', dest='conf', action='store',
help='Repair conf')
register_repair_parser.add_argument('-e', dest='engine', action='store',
help='Engine')
register_repair_parser.set_defaults(func=register_repair)
scheduler_parser = subparsers.add_parser('start-scheduler',
help='Start scheduler')
scheduler_parser.add_argument('-v', dest='verbose', help='Verbosity')
scheduler_parser.set_defaults(func=run_scheduler)
scheduler_parser = subparsers.add_parser('start-engine',
help='Start an entropy engine')
scheduler_parser.add_argument('-n', dest='name', help='Name')
scheduler_parser.add_argument('-a', dest='audit_cfg',
help='path to audit cfg')
scheduler_parser.add_argument('-l', dest='log_file',
help='log_file')
scheduler_parser.add_argument('-r', dest='repair_cfg',
help='path to repair cfg')
scheduler_parser.set_defaults(func=start_engine)
args = parser.parse_args()
@ -268,5 +168,5 @@ if __name__ == '__main__':
#TODO(praneshp): AMQP, json->yaml, reaction scripts(after amqp)
logging.basicConfig(filename=globals.log_file,
level=logging.INFO)
level=logging.DEBUG)
parse()

View File

@ -16,39 +16,49 @@
# under the License.
from concurrent.futures import ThreadPoolExecutor
import datetime
import logging
import os
import croniter
import pause
from entropy import utils
LOG = logging.getLogger(__name__)
class Engine(object):
def __init__(self, name):
def __init__(self, args):
# constants
# TODO(praneshp): Hardcode for now, could/should be cmdline input
self.script_repo = os.path.dirname(__file__)
self.log_repo = os.path.join(os.getcwd(), 'entropy', 'logs')
self.cfg_dir = os.path.join(self.script_repo, 'cfg')
self.audit_cfg = os.path.join(self.cfg_dir, 'audit.cfg')
self.repair_cfg = os.path.join(self.cfg_dir, 'repair.cfg')
self.log_file = os.path.join(self.log_repo, 'entropy.log')
self.max_workers = 8
self.audit_type = 'audit'
self.repair_type = 'repair'
# engine variables
self.name = name
self.name = args.name
self.audit_cfg = args.audit_cfg
self.repair_cfg = args.repair_cfg
# TODO(praneshp): Assuming cfg files are in 1 dir. Change later
self.cfg_dir = os.path.dirname(self.audit_cfg)
self.log_file = args.log_file
self.executor = ThreadPoolExecutor(max_workers=self.max_workers)
self.running_audits = []
self.running_repairs = []
self.futures = []
# TODO(praneshp): Look into how to do this with threadpoolexecutor?
watchdog_thread = self.start_watchdog(self.cfg_dir) # noqa
watchdog_thread.join()
LOG.info('Creating engine obj')
self.start_scheduler()
def start_scheduler(self):
pass
LOG.debug("Crap")
# Start watchdog thread, which will detect any new audit/react scripts
# TODO(praneshp): Look into how to do this with threadpoolexecutor?
watchdog_thread = self.start_watchdog(self.cfg_dir) # noqa
# Start react and audit scripts.
self.futures.append(self.start_scripts('repair'))
self.futures.append(self.start_scripts('audit'))
watchdog_thread.join()
def register_audit(self):
pass
@ -61,13 +71,111 @@ class Engine(object):
def audit_modified(self):
LOG.warning('Audit configuration changed')
# self.all_futures.append(start_scripts('audit'))
self.futures.append(self.start_scripts('audit'))
def repair_modified(self):
LOG.warning('Repair configuration changed')
# start_scripts('repair')
self.futures.append(self.start_scripts('repair'))
def start_watchdog(self, dir_to_watch):
event_fn = {self.audit_cfg: self.audit_modified,
self.repair_cfg: self.repair_modified}
return utils.watch_dir_for_change(dir_to_watch, event_fn)
def start_scripts(self, script_type):
if script_type == 'audit':
running_scripts = self.running_audits
setup_func = self.setup_audit
cfg = self.audit_cfg
elif script_type == 'repair':
running_scripts = self.running_repairs
setup_func = self.setup_react
cfg = self.repair_cfg
else:
LOG.error('Unknown script type %s', script_type)
return
scripts = utils.load_yaml(cfg)
futures = []
for script in scripts:
if script['name'] not in running_scripts:
futures.append(setup_func(script))
LOG.warning('Running %s scripts %s', script_type,
', '.join(running_scripts))
return futures
def setup_react(self, script):
LOG.warning('Setting up reactor %s', script['name'])
# Pick out relevant info
data = dict(utils.load_yaml(script['conf']).next())
react_script = data['script']
available_modules = utils.find_module(react_script, ['repair'])
LOG.info('Found these modules: %s', available_modules)
if not available_modules:
LOG.error('No module to load')
else:
imported_module = utils.import_module(available_modules[0])
kwargs = data
kwargs['conf'] = script['conf']
# add this job to list of running audits
self.running_repairs.append(script['name'])
future = self.executor.submit(imported_module.main, **kwargs)
return future
def setup_audit(self, script):
LOG.warning('Setting up audit script %s', script['name'])
# Now pick out relevant info
data = dict(utils.load_yaml(script['conf']).next())
# stuff for the message queue
mq_args = {'mq_host': data['mq_host'],
'mq_port': data['mq_port'],
'mq_user': data['mq_user'],
'mq_password': data['mq_password']}
# general stuff for the audit module
# TODO(praneshp): later, fix to send only one copy of mq_args
kwargs = data
kwargs['mq_args'] = mq_args
# add this job to list of running audits
self.running_audits.append(script['name'])
# start a process for this audit script
future = self.executor.submit(self.start_audit, **kwargs)
return future
def start_audit(self, **kwargs):
LOG.info("Starting audit for %s", kwargs['name'])
now = datetime.datetime.now()
schedule = kwargs['schedule']
cron = croniter.croniter(schedule, now)
next_iteration = cron.get_next(datetime.datetime)
while True:
LOG.warning('Next call at %s', next_iteration)
pause.until(next_iteration)
Engine.run_audit(**kwargs)
next_iteration = cron.get_next(datetime.datetime)
@staticmethod
def run_audit(**kwargs):
# Put a message on the mq
#TODO(praneshp): this should be the path with register-audit
#TODO(praneshp): The whole logic in this function should be in
# try except blocks
available_modules = utils.find_module(kwargs['module'], ['audit'])
LOG.info('Found these modules: %s', available_modules)
if not available_modules:
LOG.error('No module to load')
else:
imported_module = utils.import_module(available_modules[0])
audit_obj = imported_module.Audit()
try:
audit_obj.send_message(**kwargs)
except Exception as e:
LOG.error(e)

View File

@ -22,7 +22,6 @@ from watchdog.events import FileSystemEventHandler
from watchdog.observers import Observer
import yaml
LOG = logging.getLogger(__name__)
@ -92,8 +91,10 @@ class WatchdogHandler(FileSystemEventHandler):
self.event_fn = event_fn
def on_modified(self, event):
LOG.warning('Monitored file changed %s', event.src_path)
self.event_fn[event.src_path]()
if event.src_path in self.event_fn.keys():
self.event_fn[event.src_path]()
else:
LOG.error('no associated function for %s', event.src_path)
def watch_dir_for_change(dir_to_watch, event_fn):
@ -102,3 +103,12 @@ def watch_dir_for_change(dir_to_watch, event_fn):
observer.schedule(event_handler, path=dir_to_watch, recursive=True)
observer.start()
return observer
# TODO(praneshp) move this to utils
def check_duplicate(name, cfg_file):
scripts = load_yaml(cfg_file)
names = [script['name'] for script in scripts]
if name in names:
return True
return False