Enable stevedore and dynamic loading

Load modules dynamically, allowing better control over audit/react scripts
Change code structure a bit (put audit scripts in audit/ dir, react scripts
in repair/ dir)
Enable stevedore, for audit/react scripts installed with the package
Remove all homedir references

Change-Id: I7351d6b7cd9ca5ba9cfa9526dfbefbfecacc3dc8
This commit is contained in:
pran1990 2014-01-26 16:06:42 -08:00
parent ee0cc7d4c7
commit 5fc67635ad
17 changed files with 234 additions and 71 deletions

View File

@ -19,18 +19,17 @@ import argparse
import datetime
import logging
import os
import subprocess
import sys
import threading
import croniter
import pause
from stevedore import driver
import yaml
sys.path.insert(0, os.path.join(os.path.abspath(os.pardir)))
sys.path.insert(0, os.path.abspath(os.getcwd()))
from entropy import audit
from entropy import globals
from entropy import utils
@ -39,13 +38,14 @@ LOG = logging.getLogger(__name__)
def run_scheduler(args):
LOG.info('Starting Scheduler')
#Start react scripts
#Start react scripts. No need to join because all the react scripts are
#designed to be looping forever, for now.
react_threads = []
with open(globals.REPAIR_CFG) as cfg:
scripts = yaml.load_all(cfg)
for script in scripts:
print script
setup_react(script)
t = setup_react(script)
react_threads.append(t)
#Start audit scripts
threads = []
@ -69,7 +69,6 @@ def add_to_list(type, **kwargs):
def setup_audit(script):
LOG.warning('Setting up auditor %s' % script['name'])
# Now pick out relevant info
data = utils.load_yaml(script['conf'])
@ -98,16 +97,40 @@ def setup_react(script):
data = utils.load_yaml(script['conf'])
react_script = data['script']
cmd = ('python ' + os.path.join(globals.SCRIPT_REPO, react_script)).split()
# cpid = os.fork()
subprocess.Popen(cmd, stdin=None, stdout=None,
stderr=None, close_fds=True)
available_modules = utils.find_module(react_script, ['repair'])
LOG.info('Found these modules: %s' % available_modules)
if not available_modules:
LOG.error('No module to load')
else:
imported_module = utils.import_module(available_modules[0])
t = threading.Thread(name=data['name'], target=imported_module.main)
t.start()
return t
def run_audit(**kwargs):
# Put a message on the mq
audit.send_message(**kwargs)
#TODO(praneshp): this should be the path with register-audit
available_modules = utils.find_module('audit', ['audit'])
LOG.info('Found these modules: %s' % available_modules)
if not available_modules:
LOG.error('No module to load')
else:
imported_module = utils.import_module(available_modules[0])
audit_obj = imported_module.Audit()
audit_obj.send_message(**kwargs)
try:
LOG.info('Trying stevedore')
mgr = driver.DriverManager(
namespace='entropy.audit',
name='test',
invoke_on_load=True
)
LOG.info('mgr is %s' % mgr)
mgr.driver.test()
except Exception as e:
LOG.error(e)
def start_audit(**kwargs):
@ -144,7 +167,7 @@ def register_audit(args):
LOG.warning('Registering audit script %s' % args.name)
#First check if you have all inputs
if not (args.conf or args.script or args.name):
if not (args.conf or args.name):
LOG.error('Need path to script and json')
return

View File

@ -1,4 +1,3 @@
---
conf: /Users/praneshp/code/entropy/entropy/audit.json
conf: entropy/audit/audit.json
name: test
script: /Users/praneshp/code/entropy/entropy/runthis.sh

View File

@ -1,38 +0,0 @@
# Copyright (C) 2013 Yahoo! Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import datetime
from kombu import BrokerConnection
from kombu.common import maybe_declare
from kombu.pools import producers
from queues import entropy_exchange
from queues import PASS_KEY
#TODO(praneshp) : this should be read from a conf file.
def send_message(**kwargs):
connection = BrokerConnection('amqp://%(mq_user)s:%(mq_password)s@'
'%(mq_host)s:%(mq_port)s//' % kwargs)
message = {'From': __file__,
'Date': str(datetime.datetime.now())}
with producers[connection].acquire(block=True) as producer:
maybe_declare(entropy_exchange, producer.channel)
producer.publish(message,
exchange=entropy_exchange,
routing_key=PASS_KEY,
serializer='json')

View File

@ -1,10 +0,0 @@
#!/bin/bash
# -*- coding: utf-8 -*-
# vim: tabstop=4 shiftwidth=4 softtabstop=4
set -o errexit
set -o xtrace
# print the top five process ids.
ps aux | awk 'NR > 1 {print $1, $2, $12} ' | tail -n 5

13
entropy/audit/__init__.py Normal file
View File

@ -0,0 +1,13 @@
# Copyright (C) 2013 Yahoo! Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.

View File

@ -1,7 +1,7 @@
{
"name" : "audit",
"hostname" : "localhost",
"cron-freq" :"*/5 * * * *",
"cron-freq" :"*/3 * * * *",
"username" : "praneshp",
"ssh-key" : "id_rsa",
"mq_host": "localhost",

44
entropy/audit/audit.py Normal file
View File

@ -0,0 +1,44 @@
# Copyright (C) 2013 Yahoo! Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import datetime
import logging
from kombu import BrokerConnection
from kombu.common import maybe_declare
from kombu.pools import producers
import base
from entropy.queues import entropy_exchange
from entropy.queues import PASS_KEY
LOG = logging.getLogger(__name__)
class Audit(base.AuditBase):
def test(self):
LOG.info('hello world')
def send_message(self, **kwargs):
connection = BrokerConnection('amqp://%(mq_user)s:%(mq_password)s@'
'%(mq_host)s:%(mq_port)s//' % kwargs)
message = {'From': __file__,
'Date': str(datetime.datetime.now())}
with producers[connection].acquire(block=True) as producer:
maybe_declare(entropy_exchange, producer.channel)
producer.publish(message,
exchange=entropy_exchange,
routing_key=PASS_KEY,
serializer='json')

30
entropy/audit/base.py Normal file
View File

@ -0,0 +1,30 @@
# Copyright (C) 2013 Yahoo! Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import abc
import logging
LOG = logging.getLogger(__name__)
class AuditBase(object):
__metaclass__ = abc.ABCMeta
@abc.abstractmethod
def send_message(self, **kwargs):
pass
@abc.abstractmethod
def test(self):
pass

View File

@ -19,7 +19,8 @@ import os
SCRIPT_REPO = os.path.dirname(__file__)
LOG_REPO = os.path.join(os.path.dirname(__file__), 'logs')
# Hardcode for now
LOG_REPO = os.path.join(os.getcwd(), 'entropy', 'logs')
AUDIT_CFG = os.path.join(SCRIPT_REPO, 'audit.cfg')
REPAIR_CFG = os.path.join(SCRIPT_REPO, 'repair.cfg')
log_file = os.path.join(LOG_REPO, 'entropy.log')

View File

@ -1,3 +1,3 @@
---
conf: entropy/react.json
conf: entropy/repair/react.json
name: test

View File

@ -0,0 +1,13 @@
# Copyright (C) 2013 Yahoo! Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.

30
entropy/repair/base.py Normal file
View File

@ -0,0 +1,30 @@
# Copyright (C) 2013 Yahoo! Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import abc
import logging
LOG = logging.getLogger(__name__)
class RepairBase(object):
__metaclass__ = abc.ABCMeta
@abc.abstractmethod
def send_message(self, **kwargs):
pass
@abc.abstractmethod
def test(self):
pass

View File

@ -1,6 +1,6 @@
{
"name" : "react",
"script": "react.py",
"script": "react",
"hostname" : "localhost",
"username" : "praneshp",
"ssh-key" : "id_rsa",

View File

@ -18,13 +18,13 @@ import os
from kombu import BrokerConnection
from kombu.mixins import ConsumerMixin
from queues import pass_events
from entropy.queues import pass_events
SCRIPT_REPO = os.path.dirname(__file__)
conf_file = os.path.join(SCRIPT_REPO, 'react.json')
LOG = logging.getLogger(__name__)
LOG_REPO = os.path.join(os.path.dirname(__file__), 'logs')
LOG_REPO = os.path.join(os.getcwd(), 'entropy', 'logs')
class SomeConsumer(ConsumerMixin):
@ -62,8 +62,12 @@ def parse_conf():
return mq_args
if __name__ == '__main__':
def main():
logging.basicConfig(filename=os.path.join(LOG_REPO, 'react.log'))
LOG.warning('starting react script %s' % __file__)
mq_args = parse_conf()
recv_message(**mq_args)
if __name__ == '__main__':
main()

View File

@ -13,10 +13,15 @@
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import logging
import os
import sys
import yaml
LOG = logging.getLogger(__name__)
def get_key_path():
home_dir = os.path.expanduser("~")
@ -33,3 +38,47 @@ def get_key_path():
def load_yaml(filename):
with open(filename, "rb") as fh:
return yaml.safe_load(fh.read())
# importer functions.
# From cloudinit http://bazaar.launchpad.net/~cloud-init-dev/cloud-init/
# trunk/view/head:/cloudinit/importer.py
def import_module(module_name):
__import__(module_name)
return sys.modules[module_name]
def find_module(base_name, search_paths, required_attrs=None):
found_places = []
if not required_attrs:
required_attrs = []
# NOTE(harlowja): translate the search paths to include the base name.
real_paths = []
for path in search_paths:
real_path = []
if path:
real_path.extend(path.split("."))
real_path.append(base_name)
full_path = '.'.join(real_path)
real_paths.append(full_path)
LOG.info("Looking for modules %s that have attributes %s",
real_paths, required_attrs)
for full_path in real_paths:
mod = None
try:
mod = import_module(full_path)
except ImportError as e:
LOG.debug("Failed at attempted import of '%s' due to: %s",
full_path, e)
if not mod:
continue
found_attrs = 0
for attr in required_attrs:
if hasattr(mod, attr):
found_attrs += 1
if found_attrs == len(required_attrs):
found_places.append(full_path)
LOG.info("Found %s with attributes %s in %s", base_name,
required_attrs, found_places)
return found_places

View File

@ -5,3 +5,4 @@ croniter>=0.3.3
kombu==3.0.7
PyYAML>=3.1.0
pause==0.1.2
stevedore>0.10

View File

@ -22,6 +22,10 @@ classifier =
Programming Language :: Python :: 2.7
Programming Language :: Python :: 3.3
[entry_points]
entropy.audit =
test = entropy.audit.audit:Audit
[global]
setup-hooks =
pbr.hooks.setup_hook