[AMQP 1.0] small fixes to improve timer scalability

This patch introduces the following tweaks to the timer
implementation:

Reduce the number of timers that need to be tracked by reducing the
timer granularity to units of seconds.

Decrease the default timeout values to further reduce the total number
of tracked timers.

Batch multiple expiring events that share the same deadline.

Inline the timer comparison code in the main event loop.

Avoid using an expensive comparison method in the heap sort by using
an integer primitive instead.

Use monotonic time instead of time.time()

Change-Id: I83e86bf203e6a641085e482c7ccf0e01f4fb4d86
This commit is contained in:
Kenneth Giusti 2016-07-29 12:27:55 -04:00
parent 1bf6eeaaf9
commit b7717e1616
6 changed files with 78 additions and 47 deletions

View File

@ -27,12 +27,12 @@ functions scheduled by the Controller.
import abc
import collections
import logging
from monotonic import monotonic as now # noqa
import os
import platform
import random
import sys
import threading
import time
import uuid
import proton
@ -878,7 +878,7 @@ class Controller(pyngus.ConnectionEventHandler):
# methods executed by Tasks created by the driver:
def send(self, send_task):
if send_task.deadline and send_task.deadline <= time.time():
if send_task.deadline and send_task.deadline <= now():
send_task._on_timeout()
return
LOG.debug("Sending message to %s", send_task.target)

View File

@ -25,11 +25,12 @@ the background thread via callables.
import errno
import heapq
import logging
import math
from monotonic import monotonic as now # noqa
import os
import select
import socket
import threading
import time
import uuid
import pyngus
@ -39,6 +40,12 @@ from oslo_messaging._i18n import _LE, _LI, _LW
LOG = logging.getLogger(__name__)
def compute_timeout(offset):
# minimize the timer granularity to one second so we don't have to track
# too many timers
return math.ceil(now() + offset)
class _SocketConnection(object):
"""Associates a pyngus Connection with a python network socket,
and handles all connection-related I/O and timer events.
@ -65,7 +72,7 @@ class _SocketConnection(object):
while True:
try:
rc = pyngus.read_socket_input(self.connection, self.socket)
self.connection.process(time.time())
self.connection.process(now())
return rc
except (socket.timeout, socket.error) as e:
# pyngus handles EAGAIN/EWOULDBLOCK and EINTER
@ -79,7 +86,7 @@ class _SocketConnection(object):
while True:
try:
rc = pyngus.write_socket_output(self.connection, self.socket)
self.connection.process(time.time())
self.connection.process(now())
return rc
except (socket.timeout, socket.error) as e:
# pyngus handles EAGAIN/EWOULDBLOCK and EINTER
@ -161,52 +168,70 @@ class Scheduler(object):
"""Schedule callables to be run in the future.
"""
class Event(object):
def __init__(self, callback, deadline):
self._callback = callback
self._deadline = deadline
# simply hold a reference to a callback that can be set to None if the
# alarm is canceled
def __init__(self, callback):
self.callback = callback
def cancel(self):
# quicker than rebalancing the tree
self._callback = None
def __lt__(self, other):
return self._deadline < other._deadline
self.callback = None
def __init__(self):
self._entries = []
self._callbacks = {}
self._deadlines = []
def alarm(self, request, deadline):
"""Request a callable be executed at a specific time
"""
entry = Scheduler.Event(request, deadline)
heapq.heappush(self._entries, entry)
try:
callbacks = self._callbacks[deadline]
except KeyError:
callbacks = list()
self._callbacks[deadline] = callbacks
heapq.heappush(self._deadlines, deadline)
entry = Scheduler.Event(request)
callbacks.append(entry)
return entry
def defer(self, request, delay):
"""Request a callable be executed after delay seconds
"""
return self.alarm(request, time.time() + delay)
return self.alarm(request, compute_timeout(delay))
@property
def _next_deadline(self):
"""The timestamp of the next expiring event or None
"""
return self._deadlines[0] if self._deadlines else None
def _get_delay(self, max_delay=None):
"""Get the delay in milliseconds until the next callable needs to be
run, or 'max_delay' if no outstanding callables or the delay to the
next callable is > 'max_delay'.
"""
due = self._entries[0]._deadline if self._entries else None
due = self._deadlines[0] if self._deadlines else None
if due is None:
return max_delay
now = time.time()
if due <= now:
_now = now()
if due <= _now:
return 0
else:
return min(due - now, max_delay) if max_delay else due - now
return min(due - _now, max_delay) if max_delay else due - _now
def _process(self):
"""Invoke all expired callables."""
while self._entries and self._entries[0]._deadline <= time.time():
callback = heapq.heappop(self._entries)._callback
if callback:
callback()
if self._deadlines:
_now = now()
try:
while self._deadlines[0] <= _now:
deadline = heapq.heappop(self._deadlines)
callbacks = self._callbacks[deadline]
del self._callbacks[deadline]
for cb in callbacks:
cb.callback and cb.callback()
except IndexError:
pass
class Requests(object):
@ -325,15 +350,17 @@ class Thread(threading.Thread):
readfds.append(self._requests)
writefds = [c.user_context for c in writers]
timeout = None
if timers:
deadline = timers[0].deadline # 0 == next expiring timer
now = time.time()
timeout = 0 if deadline <= now else deadline - now
# adjust timeout for any deferred requests
timeout = self._scheduler._get_delay(timeout)
# force select to return in time to service the next expiring timer
d1 = self._scheduler._next_deadline
d2 = timers[0].deadline if timers else None
deadline = min(d1, d2) if d1 and d2 else d1 if not d2 else d2
if deadline:
_now = now()
timeout = 0 if deadline <= _now else (deadline - _now)
else:
timeout = None
# and now we wait...
try:
results = select.select(readfds, writefds, [], timeout)
except select.error as serror:
@ -348,10 +375,12 @@ class Thread(threading.Thread):
for r in readable:
r.read()
for t in timers:
if t.deadline > time.time():
break
t.process(time.time())
if timers:
_now = now()
for t in timers:
if t.deadline > _now:
break
t.process(_now)
for w in writable:
w.write()

View File

@ -125,13 +125,13 @@ amqp1_opts = [
' Only used when caller does not provide a timeout expiry.'),
cfg.IntOpt('default_send_timeout',
default=60,
default=30,
min=5,
help='The deadline for an rpc cast or call message delivery.'
' Only used when caller does not provide a timeout expiry.'),
cfg.IntOpt('default_notify_timeout',
default=60,
default=30,
min=5,
help='The deadline for a sent notification message delivery.'
' Only used when caller does not provide a timeout expiry.'),

View File

@ -24,7 +24,6 @@ import collections
import logging
import os
import threading
import time
import uuid
from oslo_config import cfg
@ -33,6 +32,7 @@ from oslo_serialization import jsonutils
from oslo_utils import importutils
from oslo_utils import timeutils
from oslo_messaging._drivers.amqp1_driver.eventloop import compute_timeout
from oslo_messaging._drivers.amqp1_driver import opts
from oslo_messaging._drivers import base
from oslo_messaging._drivers import common
@ -105,14 +105,14 @@ class ProtonIncomingMessage(base.RpcIncomingMessage):
response.correlation_id = self._correlation_id
LOG.debug("Sending RPC reply to %s (%s)", self._reply_to,
self._correlation_id)
now = time.time()
deadline = now + self.listener.driver._default_reply_timeout
driver = self.listener.driver
deadline = compute_timeout(driver._default_reply_timeout)
task = controller.SendTask("RPC Reply", response, self._reply_to,
# analogous to kombu missing dest t/o:
deadline,
retry=0,
wait_for_ack=True)
self.listener.driver._ctrl.add_task(task)
driver._ctrl.add_task(task)
rc = task.wait()
if rc:
# something failed. Not much we can do at this point but log
@ -289,7 +289,7 @@ class ProtonDriver(base.BaseDriver):
request = marshal_request(message, ctxt, envelope)
expire = 0
if timeout:
expire = time.time() + timeout # when the caller times out
expire = compute_timeout(timeout) # when the caller times out
# amqp uses millisecond time values, timeout is seconds
request.ttl = int(timeout * 1000)
request.expiry_time = int(expire * 1000)
@ -297,7 +297,7 @@ class ProtonDriver(base.BaseDriver):
# no timeout provided by application. If the backend is queueless
# this could lead to a hang - provide a default to prevent this
# TODO(kgiusti) only do this if brokerless backend
expire = time.time() + self._default_send_timeout
expire = compute_timeout(self._default_send_timeout)
LOG.debug("Sending message to %s", target)
if wait_for_reply:
task = controller.RPCCallTask(target, request, expire, retry)
@ -343,9 +343,10 @@ class ProtonDriver(base.BaseDriver):
# this
# TODO(kgiusti) should raise NotImplemented if not broker backend
LOG.debug("Send notification to %s", target)
deadline = compute_timeout(self._default_notify_timeout)
task = controller.SendTask("Notify", request, target,
time.time() + self._default_notify_timeout,
retry, wait_for_ack=True, notification=True)
deadline, retry, wait_for_ack=True,
notification=True)
self._ctrl.add_task(task)
rc = task.wait()
if isinstance(rc, Exception):

View File

@ -357,7 +357,7 @@ class TestAmqpSend(_AmqpBrokerTestCaseAuto):
driver = amqp_driver.ProtonDriver(self.conf, self._broker_url)
target = oslo_messaging.Target(topic="test-topic")
listener = _SlowResponder(
driver.listen(target, None, None)._poll_style_listener, 1)
driver.listen(target, None, None)._poll_style_listener, 3)
self.assertRaises(oslo_messaging.MessagingTimeout,
driver.send, target,

View File

@ -14,6 +14,7 @@ oslo.service>=1.10.0 # Apache-2.0
oslo.i18n>=2.1.0 # Apache-2.0
stevedore>=1.10.0 # Apache-2.0
debtcollector>=1.2.0 # Apache-2.0
monotonic>=0.6 # Apache-2.0
# for jsonutils
six>=1.9.0 # MIT