Start rebuilding rug main program

This commit includes a rudimentary main program and the
start of a scheduler for processing messages. It illustrates
the new architecture, but does not yet implement the dispatching
algorithm or actually listen to notifications.

Change-Id: Iedd467e9483a1ec0aea9a9133a6ede35825f2011
This commit is contained in:
Doug Hellmann 2013-07-18 17:25:08 -04:00
parent 55ebbfdbda
commit c3f76cd975
5 changed files with 164 additions and 1 deletions

3
.gitignore vendored
View File

@ -25,3 +25,6 @@ pip-log.txt
#Mr Developer
.mr.developer.cfg
# Packaging output
*.deb

67
akanda/rug/main.py Normal file
View File

@ -0,0 +1,67 @@
import logging
import multiprocessing
import sys
from oslo.config import cfg
from akanda.rug import notifications
from akanda.rug import scheduler
LOG = logging.getLogger(__name__)
def worker(message):
# TODO(dhellmann): Replace with something from the state machine
# module.
LOG.debug('got: %s', message)
def shuffle_notifications(notification_queue, sched):
"""Copy messages from the notification queue into the scheduler.
"""
while True:
try:
message = notification_queue.get()
sched.handle_message(message)
except KeyboardInterrupt:
sched.stop()
break
def main(argv=sys.argv[1:]):
cfg.CONF.register_cli_opts([
cfg.IntOpt('health-check-period',
default=60,
help='seconds between health checks'),
cfg.IntOpt('num-workers',
short='n',
default=16,
help='the number of worker processes to run'),
])
cfg.CONF(argv, project='akanda')
logging.basicConfig(
level=logging.DEBUG,
format='%(processName)s:%(name)s:%(message)s',
)
# Set up the queue to move messages between the eventlet-based
# listening process and the scheduler.
notification_queue = multiprocessing.Queue()
# Listen for notifications.
#
# TODO(dhellmann): We will need to pass config settings through
# here, or have the child process reset the cfg.CONF object.
notifications.listen(notification_queue)
# Set up the scheduler that knows how to manage the routers and
# dispatch messages.
sched = scheduler.Scheduler(
num_workers=cfg.CONF.num_workers,
worker_func=worker,
)
shuffle_notifications(notification_queue,
sched,
)
LOG.info('exiting')

View File

@ -0,0 +1,9 @@
"""Listen for notifications.
"""
def listen(notification_queue):
# TODO(dhellmann): Replace with a version of the service code from
# oslo that knows how to subscribe to notifications.
for i in range(5):
notification_queue.put({'key': 'value'})

83
akanda/rug/scheduler.py Normal file
View File

@ -0,0 +1,83 @@
"""Scheduler to send messages for a given router to the correct worker.
"""
import logging
import multiprocessing
import signal
LOG = logging.getLogger(__name__)
def _worker(inq, callback):
"""Scheduler's worker process main function.
"""
# Ignore SIGINT, since the parent will catch it and give us a
# chance to exit cleanly.
signal.signal(signal.SIGINT, signal.SIG_IGN)
LOG.debug('starting')
while True:
data = inq.get()
try:
callback(data)
except Exception:
LOG.exception('Error processing data %s' % data)
if data is None:
break
LOG.debug('exiting')
class Scheduler(object):
def __init__(self, num_workers, worker_func):
"""
:param num_workers: The number of worker processes to create.
:type num_workers: int
:param worker_func: Callable for the worker processes to use
when a notification is received.
:type worker_func: Callable taking one argument.
"""
if num_workers < 1:
raise ValueError('Need at least one worker process')
self.num_workers = num_workers
self.workers = []
# Create several worker processes, each with its own queue for
# sending it instructions based on the notifications we get
# when someone calls our handle_message() method.
for i in range(self.num_workers):
wq = multiprocessing.JoinableQueue()
worker = multiprocessing.Process(
target=_worker,
kwargs={
'inq': wq,
'callback': worker_func,
},
name='Worker %d' % i,
)
worker.start()
self.workers.append({
'queue': wq,
'worker': worker,
})
def stop(self):
"""Shutdown all workers cleanly.
"""
LOG.info('shutting down scheduler')
# Send a poison pill to all of the workers
for w in self.workers:
LOG.debug('sending stop message to %s', w['worker'].name)
w['queue'].put(None)
# Wait for the workers to finish and be ready to exit.
for w in self.workers:
LOG.debug('waiting for %s', w['worker'].name)
w['queue'].close()
w['worker'].join()
def handle_message(self, message):
"""Call this method when a new notification message is delivered. The
scheduler will distribute it to the appropriate worker.
"""
# TODO(dhellmann): Need a real dispatching algorithm here.
for w in self.workers:
w['queue'].put(message)

View File

@ -21,7 +21,8 @@ setup(
zip_safe=False,
entry_points={
'console_scripts': [
'akanda-rug-service=akanda.rug.service:main'
'akanda-rug-service=akanda.rug.service:main',
'akanda-rug-new=akanda.rug.main:main',
]
},
)