Start rebuilding rug main program
This commit includes a rudimentary main program and the start of a scheduler for processing messages. It illustrates the new architecture, but does not yet implement the dispatching algorithm or actually listen to notifications. Change-Id: Iedd467e9483a1ec0aea9a9133a6ede35825f2011
This commit is contained in:
parent
55ebbfdbda
commit
c3f76cd975
|
@ -25,3 +25,6 @@ pip-log.txt
|
||||||
|
|
||||||
#Mr Developer
|
#Mr Developer
|
||||||
.mr.developer.cfg
|
.mr.developer.cfg
|
||||||
|
|
||||||
|
# Packaging output
|
||||||
|
*.deb
|
||||||
|
|
|
@ -0,0 +1,67 @@
|
||||||
|
import logging
|
||||||
|
import multiprocessing
|
||||||
|
import sys
|
||||||
|
|
||||||
|
from oslo.config import cfg
|
||||||
|
|
||||||
|
from akanda.rug import notifications
|
||||||
|
from akanda.rug import scheduler
|
||||||
|
|
||||||
|
LOG = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
|
def worker(message):
|
||||||
|
# TODO(dhellmann): Replace with something from the state machine
|
||||||
|
# module.
|
||||||
|
LOG.debug('got: %s', message)
|
||||||
|
|
||||||
|
|
||||||
|
def shuffle_notifications(notification_queue, sched):
|
||||||
|
"""Copy messages from the notification queue into the scheduler.
|
||||||
|
"""
|
||||||
|
while True:
|
||||||
|
try:
|
||||||
|
message = notification_queue.get()
|
||||||
|
sched.handle_message(message)
|
||||||
|
except KeyboardInterrupt:
|
||||||
|
sched.stop()
|
||||||
|
break
|
||||||
|
|
||||||
|
|
||||||
|
def main(argv=sys.argv[1:]):
|
||||||
|
cfg.CONF.register_cli_opts([
|
||||||
|
cfg.IntOpt('health-check-period',
|
||||||
|
default=60,
|
||||||
|
help='seconds between health checks'),
|
||||||
|
cfg.IntOpt('num-workers',
|
||||||
|
short='n',
|
||||||
|
default=16,
|
||||||
|
help='the number of worker processes to run'),
|
||||||
|
])
|
||||||
|
cfg.CONF(argv, project='akanda')
|
||||||
|
logging.basicConfig(
|
||||||
|
level=logging.DEBUG,
|
||||||
|
format='%(processName)s:%(name)s:%(message)s',
|
||||||
|
)
|
||||||
|
|
||||||
|
# Set up the queue to move messages between the eventlet-based
|
||||||
|
# listening process and the scheduler.
|
||||||
|
notification_queue = multiprocessing.Queue()
|
||||||
|
|
||||||
|
# Listen for notifications.
|
||||||
|
#
|
||||||
|
# TODO(dhellmann): We will need to pass config settings through
|
||||||
|
# here, or have the child process reset the cfg.CONF object.
|
||||||
|
notifications.listen(notification_queue)
|
||||||
|
|
||||||
|
# Set up the scheduler that knows how to manage the routers and
|
||||||
|
# dispatch messages.
|
||||||
|
sched = scheduler.Scheduler(
|
||||||
|
num_workers=cfg.CONF.num_workers,
|
||||||
|
worker_func=worker,
|
||||||
|
)
|
||||||
|
shuffle_notifications(notification_queue,
|
||||||
|
sched,
|
||||||
|
)
|
||||||
|
|
||||||
|
LOG.info('exiting')
|
|
@ -0,0 +1,9 @@
|
||||||
|
"""Listen for notifications.
|
||||||
|
"""
|
||||||
|
|
||||||
|
|
||||||
|
def listen(notification_queue):
|
||||||
|
# TODO(dhellmann): Replace with a version of the service code from
|
||||||
|
# oslo that knows how to subscribe to notifications.
|
||||||
|
for i in range(5):
|
||||||
|
notification_queue.put({'key': 'value'})
|
|
@ -0,0 +1,83 @@
|
||||||
|
"""Scheduler to send messages for a given router to the correct worker.
|
||||||
|
"""
|
||||||
|
|
||||||
|
import logging
|
||||||
|
import multiprocessing
|
||||||
|
import signal
|
||||||
|
|
||||||
|
|
||||||
|
LOG = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
|
def _worker(inq, callback):
|
||||||
|
"""Scheduler's worker process main function.
|
||||||
|
"""
|
||||||
|
# Ignore SIGINT, since the parent will catch it and give us a
|
||||||
|
# chance to exit cleanly.
|
||||||
|
signal.signal(signal.SIGINT, signal.SIG_IGN)
|
||||||
|
LOG.debug('starting')
|
||||||
|
while True:
|
||||||
|
data = inq.get()
|
||||||
|
try:
|
||||||
|
callback(data)
|
||||||
|
except Exception:
|
||||||
|
LOG.exception('Error processing data %s' % data)
|
||||||
|
if data is None:
|
||||||
|
break
|
||||||
|
LOG.debug('exiting')
|
||||||
|
|
||||||
|
|
||||||
|
class Scheduler(object):
|
||||||
|
|
||||||
|
def __init__(self, num_workers, worker_func):
|
||||||
|
"""
|
||||||
|
:param num_workers: The number of worker processes to create.
|
||||||
|
:type num_workers: int
|
||||||
|
:param worker_func: Callable for the worker processes to use
|
||||||
|
when a notification is received.
|
||||||
|
:type worker_func: Callable taking one argument.
|
||||||
|
"""
|
||||||
|
if num_workers < 1:
|
||||||
|
raise ValueError('Need at least one worker process')
|
||||||
|
self.num_workers = num_workers
|
||||||
|
self.workers = []
|
||||||
|
# Create several worker processes, each with its own queue for
|
||||||
|
# sending it instructions based on the notifications we get
|
||||||
|
# when someone calls our handle_message() method.
|
||||||
|
for i in range(self.num_workers):
|
||||||
|
wq = multiprocessing.JoinableQueue()
|
||||||
|
worker = multiprocessing.Process(
|
||||||
|
target=_worker,
|
||||||
|
kwargs={
|
||||||
|
'inq': wq,
|
||||||
|
'callback': worker_func,
|
||||||
|
},
|
||||||
|
name='Worker %d' % i,
|
||||||
|
)
|
||||||
|
worker.start()
|
||||||
|
self.workers.append({
|
||||||
|
'queue': wq,
|
||||||
|
'worker': worker,
|
||||||
|
})
|
||||||
|
|
||||||
|
def stop(self):
|
||||||
|
"""Shutdown all workers cleanly.
|
||||||
|
"""
|
||||||
|
LOG.info('shutting down scheduler')
|
||||||
|
# Send a poison pill to all of the workers
|
||||||
|
for w in self.workers:
|
||||||
|
LOG.debug('sending stop message to %s', w['worker'].name)
|
||||||
|
w['queue'].put(None)
|
||||||
|
# Wait for the workers to finish and be ready to exit.
|
||||||
|
for w in self.workers:
|
||||||
|
LOG.debug('waiting for %s', w['worker'].name)
|
||||||
|
w['queue'].close()
|
||||||
|
w['worker'].join()
|
||||||
|
|
||||||
|
def handle_message(self, message):
|
||||||
|
"""Call this method when a new notification message is delivered. The
|
||||||
|
scheduler will distribute it to the appropriate worker.
|
||||||
|
"""
|
||||||
|
# TODO(dhellmann): Need a real dispatching algorithm here.
|
||||||
|
for w in self.workers:
|
||||||
|
w['queue'].put(message)
|
Loading…
Reference in New Issue