diff --git a/.gitignore b/.gitignore index f24cd995..c40d562d 100644 --- a/.gitignore +++ b/.gitignore @@ -25,3 +25,6 @@ pip-log.txt #Mr Developer .mr.developer.cfg + +# Packaging output +*.deb diff --git a/akanda/rug/main.py b/akanda/rug/main.py new file mode 100644 index 00000000..3dc7efb3 --- /dev/null +++ b/akanda/rug/main.py @@ -0,0 +1,67 @@ +import logging +import multiprocessing +import sys + +from oslo.config import cfg + +from akanda.rug import notifications +from akanda.rug import scheduler + +LOG = logging.getLogger(__name__) + + +def worker(message): + # TODO(dhellmann): Replace with something from the state machine + # module. + LOG.debug('got: %s', message) + + +def shuffle_notifications(notification_queue, sched): + """Copy messages from the notification queue into the scheduler. + """ + while True: + try: + message = notification_queue.get() + sched.handle_message(message) + except KeyboardInterrupt: + sched.stop() + break + + +def main(argv=sys.argv[1:]): + cfg.CONF.register_cli_opts([ + cfg.IntOpt('health-check-period', + default=60, + help='seconds between health checks'), + cfg.IntOpt('num-workers', + short='n', + default=16, + help='the number of worker processes to run'), + ]) + cfg.CONF(argv, project='akanda') + logging.basicConfig( + level=logging.DEBUG, + format='%(processName)s:%(name)s:%(message)s', + ) + + # Set up the queue to move messages between the eventlet-based + # listening process and the scheduler. + notification_queue = multiprocessing.Queue() + + # Listen for notifications. + # + # TODO(dhellmann): We will need to pass config settings through + # here, or have the child process reset the cfg.CONF object. + notifications.listen(notification_queue) + + # Set up the scheduler that knows how to manage the routers and + # dispatch messages. + sched = scheduler.Scheduler( + num_workers=cfg.CONF.num_workers, + worker_func=worker, + ) + shuffle_notifications(notification_queue, + sched, + ) + + LOG.info('exiting') diff --git a/akanda/rug/notifications.py b/akanda/rug/notifications.py new file mode 100644 index 00000000..4ecaf1d2 --- /dev/null +++ b/akanda/rug/notifications.py @@ -0,0 +1,9 @@ +"""Listen for notifications. +""" + + +def listen(notification_queue): + # TODO(dhellmann): Replace with a version of the service code from + # oslo that knows how to subscribe to notifications. + for i in range(5): + notification_queue.put({'key': 'value'}) diff --git a/akanda/rug/scheduler.py b/akanda/rug/scheduler.py new file mode 100644 index 00000000..3dcc9dfb --- /dev/null +++ b/akanda/rug/scheduler.py @@ -0,0 +1,83 @@ +"""Scheduler to send messages for a given router to the correct worker. +""" + +import logging +import multiprocessing +import signal + + +LOG = logging.getLogger(__name__) + + +def _worker(inq, callback): + """Scheduler's worker process main function. + """ + # Ignore SIGINT, since the parent will catch it and give us a + # chance to exit cleanly. + signal.signal(signal.SIGINT, signal.SIG_IGN) + LOG.debug('starting') + while True: + data = inq.get() + try: + callback(data) + except Exception: + LOG.exception('Error processing data %s' % data) + if data is None: + break + LOG.debug('exiting') + + +class Scheduler(object): + + def __init__(self, num_workers, worker_func): + """ + :param num_workers: The number of worker processes to create. + :type num_workers: int + :param worker_func: Callable for the worker processes to use + when a notification is received. + :type worker_func: Callable taking one argument. + """ + if num_workers < 1: + raise ValueError('Need at least one worker process') + self.num_workers = num_workers + self.workers = [] + # Create several worker processes, each with its own queue for + # sending it instructions based on the notifications we get + # when someone calls our handle_message() method. + for i in range(self.num_workers): + wq = multiprocessing.JoinableQueue() + worker = multiprocessing.Process( + target=_worker, + kwargs={ + 'inq': wq, + 'callback': worker_func, + }, + name='Worker %d' % i, + ) + worker.start() + self.workers.append({ + 'queue': wq, + 'worker': worker, + }) + + def stop(self): + """Shutdown all workers cleanly. + """ + LOG.info('shutting down scheduler') + # Send a poison pill to all of the workers + for w in self.workers: + LOG.debug('sending stop message to %s', w['worker'].name) + w['queue'].put(None) + # Wait for the workers to finish and be ready to exit. + for w in self.workers: + LOG.debug('waiting for %s', w['worker'].name) + w['queue'].close() + w['worker'].join() + + def handle_message(self, message): + """Call this method when a new notification message is delivered. The + scheduler will distribute it to the appropriate worker. + """ + # TODO(dhellmann): Need a real dispatching algorithm here. + for w in self.workers: + w['queue'].put(message) diff --git a/setup.py b/setup.py index 9dc53a03..be607cb5 100644 --- a/setup.py +++ b/setup.py @@ -21,7 +21,8 @@ setup( zip_safe=False, entry_points={ 'console_scripts': [ - 'akanda-rug-service=akanda.rug.service:main' + 'akanda-rug-service=akanda.rug.service:main', + 'akanda-rug-new=akanda.rug.main:main', ] }, )