Change code structure a bit

Use register-audit to register audit script, register-repair to
register repair script. start-scheduler to start all the react
scripts and then schedule audit scripts.

Added audit.cfg and repair.cfg files to strore registered scripts,
this will help restart after failure.

Added globals.py to store global variables

Removed validate_cfg function that wasn't doing anything

Change-Id: Id9140d2665e5710e6ffe2ed707135ff9a30ccdff
This commit is contained in:
pran1990 2013-12-29 04:01:35 -08:00
parent a16b654009
commit ee0cc7d4c7
7 changed files with 172 additions and 53 deletions

View File

@ -19,30 +19,90 @@ import argparse
import datetime
import logging
import os
import subprocess
import sys
import threading
import time
import croniter
import pause
import yaml
sys.path.insert(0, os.path.join(os.path.abspath(os.pardir)))
sys.path.insert(0, os.path.abspath(os.getcwd()))
from entropy import audit
from entropy import globals
from entropy import utils
GOOD_MOOD = 1
SCRIPT_REPO = os.path.dirname(__file__)
LOG_REPO = os.path.join(os.path.dirname(__file__), 'logs')
LOG = logging.getLogger(__name__)
def validate_cfg(file):
#TODO(praneshp): can do better here
if GOOD_MOOD == 1:
return True
return False
def run_scheduler(args):
LOG.info('Starting Scheduler')
#Start react scripts
with open(globals.REPAIR_CFG) as cfg:
scripts = yaml.load_all(cfg)
for script in scripts:
print script
setup_react(script)
#Start audit scripts
threads = []
with open(globals.AUDIT_CFG, 'r') as cfg:
scripts = yaml.load_all(cfg)
for script in scripts:
t = setup_audit(script)
threads.append(t)
# Now join on the threads so you run forever
[t.join() for t in threads]
def add_to_list(type, **kwargs):
cfg_file = globals.AUDIT_CFG if type == 'audit' else globals.REPAIR_CFG
with open(cfg_file, "a") as cfg:
cfg.write(yaml.dump(kwargs, canonical=False,
default_flow_style=False,
explicit_start=True))
def setup_audit(script):
LOG.warning('Setting up auditor %s' % script['name'])
# Now pick out relevant info
data = utils.load_yaml(script['conf'])
# stuff for the message queue
mq_args = {'mq_host': data['mq_host'],
'mq_port': data['mq_port'],
'mq_user': data['mq_user'],
'mq_password': data['mq_password']}
# general stuff for the audit module
kwargs = {'sshkey': utils.get_key_path(),
'name': data['name'],
'schedule': data['cron-freq'],
'mq_args': mq_args}
#Start a thread to run a cron job for this audit script
t = threading.Thread(name=kwargs['name'], target=start_audit,
kwargs=kwargs)
t.start()
return t
def setup_react(script):
LOG.warning('Setting up reactor %s' % script['name'])
data = utils.load_yaml(script['conf'])
react_script = data['script']
cmd = ('python ' + os.path.join(globals.SCRIPT_REPO, react_script)).split()
# cpid = os.fork()
subprocess.Popen(cmd, stdin=None, stdout=None,
stderr=None, close_fds=True)
def run_audit(**kwargs):
@ -62,50 +122,63 @@ def start_audit(**kwargs):
next_iteration = cron.get_next(datetime.datetime)
def check_duplicate(name, cfg_file):
with open(cfg_file, 'r') as cfg:
scripts = yaml.load_all(cfg)
names = [script['name'] for script in scripts]
if name in names:
return True
return False
def repair_present(name):
return check_duplicate(name, globals.REPAIR_CFG)
def audit_present(name):
return check_duplicate(name, globals.AUDIT_CFG)
def register_audit(args):
LOG.warning('Registering audit script')
#TODO(praneshp) check for sanity (file exists, imp parameters exist, etc)
LOG.warning('Registering audit script %s' % args.name)
#first check if you have all inputs
if not (args.conf or args.script):
LOG.warning('Need path to script and json')
sys.exit(1)
#First check if you have all inputs
if not (args.conf or args.script or args.name):
LOG.error('Need path to script and json')
return
# Now validate cfg
conf_file = os.path.join(SCRIPT_REPO, args.conf)
validate_cfg(conf_file)
#Check if this one is already present
if audit_present(args.name):
LOG.error('Audit already exists, not registering')
return
# Now pick out relevant info
# TODO(praneshp) eventually this must become a function call
data = utils.load_yaml(conf_file)
# stuff for the message queue
mq_args = {'mq_host': data['mq_host'],
'mq_port': data['mq_port'],
'mq_user': data['mq_user'],
'mq_password': data['mq_password']}
# general stuff for the audit module
kwargs = {'sshkey': utils.get_key_path(),
'name': data['name'],
'schedule': data['cron-freq'],
'mq_args': mq_args}
#Start a thread to run a cron job for this audit script
t = threading.Thread(name=kwargs['name'], target=start_audit,
kwargs=kwargs)
t.start()
t.join()
#TODO(praneshp): add this to a cfg file, to recover in case of failure
#Write to audit file
audit_cfg_args = {'name': args.name,
'conf': args.conf}
add_to_list('audit', **audit_cfg_args)
LOG.info('Registered audit %s' % args.name)
def register_repair(args):
LOG.warning('Registering repair script')
#TODO(praneshp) check for sanity (file exists, imp parameters exist, etc)
LOG.warning('Registering repair script %s' % args.name)
#First check if you have all inputs
if not (args.conf or args.name):
LOG.error('Need path to script and json')
return
def init():
LOG.warning('Initializing')
#TODO(praneshp): come up with to start all registered reaction scripts
#Check if this one is already present
if repair_present(args.name):
LOG.error('Repair script already exists, not registering')
return
#Write to audit file
repair_cfg_args = {'name': args.name,
'conf': args.conf}
add_to_list('repair', **repair_cfg_args)
LOG.info('Registered repair script %s' % args.name)
def parse():
@ -113,26 +186,38 @@ def parse():
subparsers = parser.add_subparsers(dest='command',
help='commands')
register_audit_parser = subparsers.add_parser('register-audit')
register_audit_parser =\
subparsers.add_parser('register-audit',
help='Register a repair script')
register_audit_parser.add_argument('-n', dest='name',
action='store', help='Name of auditor')
register_audit_parser.add_argument('-f', dest='script',
action='store', help='Audit script')
register_audit_parser.add_argument('-c', dest='conf', action='store',
help='Audit conf')
register_audit_parser.set_defaults(func=register_audit)
register_repair_parser =\
subparsers.add_parser('register-repair',
help='Register a repair script')
register_repair_parser.add_argument('-f', dest='filename', action='store',
help='Repair script location')
register_repair_parser.add_argument('-n', dest='name', action='store',
help='Repair script name')
register_repair_parser.add_argument('-c', dest='conf', action='store',
help='Repair conf')
register_repair_parser.set_defaults(func=register_repair)
scheduler_parser = subparsers.add_parser('start-scheduler',
help='Start scheduler')
scheduler_parser.add_argument('-v', dest='verbose', help='Verbosity')
scheduler_parser.set_defaults(func=run_scheduler)
args = parser.parse_args()
args.func(args)
if __name__ == '__main__':
#TODO(praneshp): AMQP, json->yaml, reaction scripts(after amqp)
logging.basicConfig(filename=os.path.join(
LOG_REPO, 'entropy-' + str(time.time()) + '.log'))
init()
logging.basicConfig(filename=globals.log_file,
level=logging.INFO)
parse()

4
entropy/audit.cfg Normal file
View File

@ -0,0 +1,4 @@
---
conf: /Users/praneshp/code/entropy/entropy/audit.json
name: test
script: /Users/praneshp/code/entropy/entropy/runthis.sh

View File

@ -7,5 +7,6 @@
"mq_host": "localhost",
"mq_port": "5672",
"mq_user": "guest",
"mq_password": "guest"
"mq_password": "guest",
"script": "runthis.sh"
}

25
entropy/globals.py Normal file
View File

@ -0,0 +1,25 @@
# -*- coding: utf-8 -*-
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright (C) 2013 Yahoo! Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import os
SCRIPT_REPO = os.path.dirname(__file__)
LOG_REPO = os.path.join(os.path.dirname(__file__), 'logs')
AUDIT_CFG = os.path.join(SCRIPT_REPO, 'audit.cfg')
REPAIR_CFG = os.path.join(SCRIPT_REPO, 'repair.cfg')
log_file = os.path.join(LOG_REPO, 'entropy.log')

View File

@ -1,5 +1,6 @@
{
"name" : "react",
"script": "react.py",
"hostname" : "localhost",
"username" : "praneshp",
"ssh-key" : "id_rsa",

View File

@ -24,6 +24,7 @@ from queues import pass_events
SCRIPT_REPO = os.path.dirname(__file__)
conf_file = os.path.join(SCRIPT_REPO, 'react.json')
LOG = logging.getLogger(__name__)
LOG_REPO = os.path.join(os.path.dirname(__file__), 'logs')
class SomeConsumer(ConsumerMixin):
@ -62,8 +63,7 @@ def parse_conf():
if __name__ == '__main__':
#can log to stdout for now
logging.basicConfig()
logging.basicConfig(filename=os.path.join(LOG_REPO, 'react.log'))
LOG.warning('starting react script %s' % __file__)
mq_args = parse_conf()
recv_message(**mq_args)

3
entropy/repair.cfg Normal file
View File

@ -0,0 +1,3 @@
---
conf: entropy/react.json
name: test