152 lines
5.0 KiB
Python
152 lines
5.0 KiB
Python
# Copyright 2014 DreamHost, LLC
|
|
#
|
|
# Author: DreamHost, LLC
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
|
# not use this file except in compliance with the License. You may obtain
|
|
# a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
|
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
|
# License for the specific language governing permissions and limitations
|
|
# under the License.
|
|
|
|
|
|
"""Scheduler to send messages for a given router to the correct worker.
|
|
"""
|
|
|
|
import logging
|
|
import multiprocessing
|
|
import uuid
|
|
|
|
from akanda.rug import commands
|
|
from akanda.rug import daemon
|
|
|
|
|
|
LOG = logging.getLogger(__name__)
|
|
|
|
|
|
def _worker(inq, worker_factory):
|
|
"""Scheduler's worker process main function.
|
|
"""
|
|
daemon.ignore_signals()
|
|
LOG.debug('starting worker process')
|
|
worker = worker_factory()
|
|
while True:
|
|
try:
|
|
data = inq.get()
|
|
except IOError:
|
|
# NOTE(dhellmann): Likely caused by a signal arriving
|
|
# during processing, especially SIGCHLD.
|
|
data = None
|
|
if data is None:
|
|
target, message = None, None
|
|
else:
|
|
target, message = data
|
|
try:
|
|
worker.handle_message(target, message)
|
|
except Exception:
|
|
LOG.exception('Error processing data %s' % unicode(data))
|
|
if data is None:
|
|
break
|
|
LOG.debug('exiting')
|
|
|
|
|
|
class Dispatcher(object):
|
|
"""Choose one of the workers to receive a message.
|
|
|
|
The current implementation uses the least significant bits of the
|
|
UUID as an integer to shard across the worker pool.
|
|
"""
|
|
|
|
def __init__(self, workers):
|
|
self.workers = workers
|
|
|
|
def pick_workers(self, target):
|
|
"""Returns the workers that match the target.
|
|
"""
|
|
target = target.strip() if target else None
|
|
# If we get any wildcard target, send the message to all of
|
|
# the workers.
|
|
if target in commands.WILDCARDS:
|
|
return self.workers[:]
|
|
try:
|
|
idx = uuid.UUID(target).int % len(self.workers)
|
|
except (TypeError, ValueError) as e:
|
|
LOG.warning(
|
|
'could not determine UUID from %r: %s, ignoring message',
|
|
target, e,
|
|
)
|
|
return []
|
|
else:
|
|
LOG.debug('target %s maps to worker %s', target, idx)
|
|
return [self.workers[idx]]
|
|
|
|
|
|
class Scheduler(object):
|
|
"""Managers a worker pool and redistributes messages.
|
|
"""
|
|
|
|
def __init__(self, num_workers, worker_factory):
|
|
"""
|
|
:param num_workers: The number of worker processes to create.
|
|
:type num_workers: int
|
|
:param worker_func: Callable for the worker processes to use
|
|
when a notification is received.
|
|
:type worker_factory: Callable to create Worker instances.
|
|
"""
|
|
if num_workers < 1:
|
|
raise ValueError('Need at least one worker process')
|
|
self.num_workers = num_workers
|
|
self.workers = []
|
|
# Create several worker processes, each with its own queue for
|
|
# sending it instructions based on the notifications we get
|
|
# when someone calls our handle_message() method.
|
|
for i in range(self.num_workers):
|
|
wq = multiprocessing.JoinableQueue()
|
|
worker = multiprocessing.Process(
|
|
target=_worker,
|
|
kwargs={
|
|
'inq': wq,
|
|
'worker_factory': worker_factory,
|
|
},
|
|
name='p%02d' % i,
|
|
)
|
|
worker.start()
|
|
self.workers.append({
|
|
'queue': wq,
|
|
'worker': worker,
|
|
})
|
|
self.dispatcher = Dispatcher(self.workers)
|
|
|
|
def stop(self):
|
|
"""Shutdown all workers cleanly.
|
|
"""
|
|
LOG.info('shutting down scheduler')
|
|
# Send a poison pill to all of the workers
|
|
for w in self.workers:
|
|
LOG.debug('sending stop message to %s', w['worker'].name)
|
|
w['queue'].put(None)
|
|
# Wait for the workers to finish and be ready to exit.
|
|
for w in self.workers:
|
|
LOG.debug('waiting for queue for %s', w['worker'].name)
|
|
w['queue'].close()
|
|
LOG.debug('waiting for worker %s', w['worker'].name)
|
|
w['worker'].join()
|
|
LOG.info('scheduler shutdown')
|
|
|
|
def handle_message(self, target, message):
|
|
"""Call this method when a new notification message is delivered. The
|
|
scheduler will distribute it to the appropriate worker.
|
|
|
|
:param target: UUID of the resource that needs to get the message.
|
|
:type target: uuid
|
|
:param message: Dictionary full of data to send to the target.
|
|
:type message: dict
|
|
"""
|
|
for w in self.dispatcher.pick_workers(target):
|
|
w['queue'].put((target, message))
|