Start rebuilding rug main program
This commit includes a rudimentary main program and the start of a scheduler for processing messages. It illustrates the new architecture, but does not yet implement the dispatching algorithm or actually listen to notifications. Change-Id: Iedd467e9483a1ec0aea9a9133a6ede35825f2011
This commit is contained in:
parent
55ebbfdbda
commit
c3f76cd975
|
@ -25,3 +25,6 @@ pip-log.txt
|
|||
|
||||
#Mr Developer
|
||||
.mr.developer.cfg
|
||||
|
||||
# Packaging output
|
||||
*.deb
|
||||
|
|
|
@ -0,0 +1,67 @@
|
|||
import logging
|
||||
import multiprocessing
|
||||
import sys
|
||||
|
||||
from oslo.config import cfg
|
||||
|
||||
from akanda.rug import notifications
|
||||
from akanda.rug import scheduler
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def worker(message):
|
||||
# TODO(dhellmann): Replace with something from the state machine
|
||||
# module.
|
||||
LOG.debug('got: %s', message)
|
||||
|
||||
|
||||
def shuffle_notifications(notification_queue, sched):
|
||||
"""Copy messages from the notification queue into the scheduler.
|
||||
"""
|
||||
while True:
|
||||
try:
|
||||
message = notification_queue.get()
|
||||
sched.handle_message(message)
|
||||
except KeyboardInterrupt:
|
||||
sched.stop()
|
||||
break
|
||||
|
||||
|
||||
def main(argv=sys.argv[1:]):
|
||||
cfg.CONF.register_cli_opts([
|
||||
cfg.IntOpt('health-check-period',
|
||||
default=60,
|
||||
help='seconds between health checks'),
|
||||
cfg.IntOpt('num-workers',
|
||||
short='n',
|
||||
default=16,
|
||||
help='the number of worker processes to run'),
|
||||
])
|
||||
cfg.CONF(argv, project='akanda')
|
||||
logging.basicConfig(
|
||||
level=logging.DEBUG,
|
||||
format='%(processName)s:%(name)s:%(message)s',
|
||||
)
|
||||
|
||||
# Set up the queue to move messages between the eventlet-based
|
||||
# listening process and the scheduler.
|
||||
notification_queue = multiprocessing.Queue()
|
||||
|
||||
# Listen for notifications.
|
||||
#
|
||||
# TODO(dhellmann): We will need to pass config settings through
|
||||
# here, or have the child process reset the cfg.CONF object.
|
||||
notifications.listen(notification_queue)
|
||||
|
||||
# Set up the scheduler that knows how to manage the routers and
|
||||
# dispatch messages.
|
||||
sched = scheduler.Scheduler(
|
||||
num_workers=cfg.CONF.num_workers,
|
||||
worker_func=worker,
|
||||
)
|
||||
shuffle_notifications(notification_queue,
|
||||
sched,
|
||||
)
|
||||
|
||||
LOG.info('exiting')
|
|
@ -0,0 +1,9 @@
|
|||
"""Listen for notifications.
|
||||
"""
|
||||
|
||||
|
||||
def listen(notification_queue):
|
||||
# TODO(dhellmann): Replace with a version of the service code from
|
||||
# oslo that knows how to subscribe to notifications.
|
||||
for i in range(5):
|
||||
notification_queue.put({'key': 'value'})
|
|
@ -0,0 +1,83 @@
|
|||
"""Scheduler to send messages for a given router to the correct worker.
|
||||
"""
|
||||
|
||||
import logging
|
||||
import multiprocessing
|
||||
import signal
|
||||
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def _worker(inq, callback):
|
||||
"""Scheduler's worker process main function.
|
||||
"""
|
||||
# Ignore SIGINT, since the parent will catch it and give us a
|
||||
# chance to exit cleanly.
|
||||
signal.signal(signal.SIGINT, signal.SIG_IGN)
|
||||
LOG.debug('starting')
|
||||
while True:
|
||||
data = inq.get()
|
||||
try:
|
||||
callback(data)
|
||||
except Exception:
|
||||
LOG.exception('Error processing data %s' % data)
|
||||
if data is None:
|
||||
break
|
||||
LOG.debug('exiting')
|
||||
|
||||
|
||||
class Scheduler(object):
|
||||
|
||||
def __init__(self, num_workers, worker_func):
|
||||
"""
|
||||
:param num_workers: The number of worker processes to create.
|
||||
:type num_workers: int
|
||||
:param worker_func: Callable for the worker processes to use
|
||||
when a notification is received.
|
||||
:type worker_func: Callable taking one argument.
|
||||
"""
|
||||
if num_workers < 1:
|
||||
raise ValueError('Need at least one worker process')
|
||||
self.num_workers = num_workers
|
||||
self.workers = []
|
||||
# Create several worker processes, each with its own queue for
|
||||
# sending it instructions based on the notifications we get
|
||||
# when someone calls our handle_message() method.
|
||||
for i in range(self.num_workers):
|
||||
wq = multiprocessing.JoinableQueue()
|
||||
worker = multiprocessing.Process(
|
||||
target=_worker,
|
||||
kwargs={
|
||||
'inq': wq,
|
||||
'callback': worker_func,
|
||||
},
|
||||
name='Worker %d' % i,
|
||||
)
|
||||
worker.start()
|
||||
self.workers.append({
|
||||
'queue': wq,
|
||||
'worker': worker,
|
||||
})
|
||||
|
||||
def stop(self):
|
||||
"""Shutdown all workers cleanly.
|
||||
"""
|
||||
LOG.info('shutting down scheduler')
|
||||
# Send a poison pill to all of the workers
|
||||
for w in self.workers:
|
||||
LOG.debug('sending stop message to %s', w['worker'].name)
|
||||
w['queue'].put(None)
|
||||
# Wait for the workers to finish and be ready to exit.
|
||||
for w in self.workers:
|
||||
LOG.debug('waiting for %s', w['worker'].name)
|
||||
w['queue'].close()
|
||||
w['worker'].join()
|
||||
|
||||
def handle_message(self, message):
|
||||
"""Call this method when a new notification message is delivered. The
|
||||
scheduler will distribute it to the appropriate worker.
|
||||
"""
|
||||
# TODO(dhellmann): Need a real dispatching algorithm here.
|
||||
for w in self.workers:
|
||||
w['queue'].put(message)
|
Loading…
Reference in New Issue