oslo.utils/oslo_utils/timeutils.py

476 lines
14 KiB
Python

# Copyright 2011 OpenStack Foundation.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
Time related utilities and helper functions.
"""
import calendar
import datetime
import time
from debtcollector import removals
import iso8601
from monotonic import monotonic as now # noqa
from pytz import timezone
import six
from oslo_utils import reflection
# ISO 8601 extended time format with microseconds
_ISO8601_TIME_FORMAT_SUBSECOND = '%Y-%m-%dT%H:%M:%S.%f'
_ISO8601_TIME_FORMAT = '%Y-%m-%dT%H:%M:%S'
PERFECT_TIME_FORMAT = _ISO8601_TIME_FORMAT_SUBSECOND
_MAX_DATETIME_SEC = 59
@removals.remove(
message="use datetime.datetime.isoformat()",
version="1.6",
removal_version="?",
)
def isotime(at=None, subsecond=False):
"""Stringify time in ISO 8601 format.
.. deprecated:: > 1.5.0
Use :func:`utcnow` and :func:`datetime.datetime.isoformat` instead.
"""
if not at:
at = utcnow()
st = at.strftime(_ISO8601_TIME_FORMAT
if not subsecond
else _ISO8601_TIME_FORMAT_SUBSECOND)
tz = at.tzinfo.tzname(None) if at.tzinfo else 'UTC'
st += ('Z' if tz == 'UTC' else tz)
return st
def parse_isotime(timestr):
"""Parse time from ISO 8601 format."""
try:
return iso8601.parse_date(timestr)
except iso8601.ParseError as e:
raise ValueError(six.text_type(e))
except TypeError as e:
raise ValueError(six.text_type(e))
@removals.remove(
message="use either datetime.datetime.isoformat() "
"or datetime.datetime.strftime() instead",
version="1.6",
removal_version="?",
)
def strtime(at=None, fmt=PERFECT_TIME_FORMAT):
"""Returns formatted utcnow.
.. deprecated:: > 1.5.0
Use :func:`utcnow()`, :func:`datetime.datetime.isoformat`
or :func:`datetime.strftime` instead.
strtime() => utcnow().isoformat()
strtime(fmt=...) => utcnow().strftime(fmt)
strtime(at) => at.isoformat()
strtime(at, fmt) => at.strftime(fmt)
"""
if not at:
at = utcnow()
return at.strftime(fmt)
def parse_strtime(timestr, fmt=PERFECT_TIME_FORMAT):
"""Turn a formatted time back into a datetime."""
return datetime.datetime.strptime(timestr, fmt)
def normalize_time(timestamp):
"""Normalize time in arbitrary timezone to UTC naive object."""
offset = timestamp.utcoffset()
if offset is None:
return timestamp
return timestamp.replace(tzinfo=None) - offset
def is_older_than(before, seconds):
"""Return True if before is older than seconds."""
if isinstance(before, six.string_types):
before = parse_isotime(before)
before = normalize_time(before)
return utcnow() - before > datetime.timedelta(seconds=seconds)
def is_newer_than(after, seconds):
"""Return True if after is newer than seconds."""
if isinstance(after, six.string_types):
after = parse_isotime(after)
after = normalize_time(after)
return after - utcnow() > datetime.timedelta(seconds=seconds)
def utcnow_ts(microsecond=False):
"""Timestamp version of our utcnow function.
See :py:class:`oslo_utils.fixture.TimeFixture`.
"""
if utcnow.override_time is None:
# NOTE(kgriffs): This is several times faster
# than going through calendar.timegm(...)
timestamp = time.time()
if not microsecond:
timestamp = int(timestamp)
return timestamp
now = utcnow()
timestamp = calendar.timegm(now.timetuple())
if microsecond:
timestamp += float(now.microsecond) / 1000000
return timestamp
def utcnow(with_timezone=False):
"""Overridable version of utils.utcnow that can return a TZ-aware datetime.
See :py:class:`oslo_utils.fixture.TimeFixture`.
"""
if utcnow.override_time:
try:
return utcnow.override_time.pop(0)
except AttributeError:
return utcnow.override_time
if with_timezone:
return datetime.datetime.now(tz=iso8601.iso8601.UTC)
return datetime.datetime.utcnow()
@removals.remove(
message="use datetime.datetime.utcfromtimestamp().isoformat()",
version="1.6",
removal_version="?",
)
def iso8601_from_timestamp(timestamp, microsecond=False):
"""Returns an iso8601 formatted date from timestamp.
.. deprecated:: > 1.5.0
Use :func:`datetime.datetime.utcfromtimestamp` and
:func:`datetime.datetime.isoformat` instead.
"""
return isotime(datetime.datetime.utcfromtimestamp(timestamp), microsecond)
utcnow.override_time = None
def set_time_override(override_time=None):
"""Overrides utils.utcnow.
Make it return a constant time or a list thereof, one at a time.
See :py:class:`oslo_utils.fixture.TimeFixture`.
:param override_time: datetime instance or list thereof. If not
given, defaults to the current UTC time.
"""
utcnow.override_time = override_time or datetime.datetime.utcnow()
def advance_time_delta(timedelta):
"""Advance overridden time using a datetime.timedelta.
See :py:class:`oslo_utils.fixture.TimeFixture`.
"""
assert utcnow.override_time is not None
try:
for dt in utcnow.override_time:
dt += timedelta
except TypeError:
utcnow.override_time += timedelta
def advance_time_seconds(seconds):
"""Advance overridden time by seconds.
See :py:class:`oslo_utils.fixture.TimeFixture`.
"""
advance_time_delta(datetime.timedelta(0, seconds))
def clear_time_override():
"""Remove the overridden time.
See :py:class:`oslo_utils.fixture.TimeFixture`.
"""
utcnow.override_time = None
def marshall_now(now=None):
"""Make an rpc-safe datetime with microseconds."""
if not now:
now = utcnow()
d = dict(day=now.day, month=now.month, year=now.year, hour=now.hour,
minute=now.minute, second=now.second,
microsecond=now.microsecond)
if now.tzinfo:
d['tzname'] = now.tzinfo.tzname(None)
return d
def unmarshall_time(tyme):
"""Unmarshall a datetime dict."""
# NOTE(ihrachys): datetime does not support leap seconds,
# so the best thing we can do for now is dropping them
# http://bugs.python.org/issue23574
second = min(tyme['second'], _MAX_DATETIME_SEC)
dt = datetime.datetime(day=tyme['day'],
month=tyme['month'],
year=tyme['year'],
hour=tyme['hour'],
minute=tyme['minute'],
second=second,
microsecond=tyme['microsecond'])
tzname = tyme.get('tzname')
if tzname:
tzinfo = timezone(tzname)
dt = tzinfo.localize(dt)
return dt
def delta_seconds(before, after):
"""Return the difference between two timing objects.
Compute the difference in seconds between two date, time, or
datetime objects (as a float, to microsecond resolution).
"""
delta = after - before
return total_seconds(delta)
def total_seconds(delta):
"""Return the total seconds of datetime.timedelta object.
Compute total seconds of datetime.timedelta, datetime.timedelta
doesn't have method total_seconds in Python2.6, calculate it manually.
"""
try:
return delta.total_seconds()
except AttributeError:
return ((delta.days * 24 * 3600) + delta.seconds +
float(delta.microseconds) / (10 ** 6))
def is_soon(dt, window):
"""Determines if time is going to happen in the next window seconds.
:param dt: the time
:param window: minimum seconds to remain to consider the time not soon
:return: True if expiration is within the given duration
"""
soon = (utcnow() + datetime.timedelta(seconds=window))
return normalize_time(dt) <= soon
class Split(object):
"""A *immutable* stopwatch split.
See: http://en.wikipedia.org/wiki/Stopwatch for what this is/represents.
"""
__slots__ = ['_elapsed', '_length']
def __init__(self, elapsed, length):
self._elapsed = elapsed
self._length = length
@property
def elapsed(self):
"""Duration from stopwatch start."""
return self._elapsed
@property
def length(self):
"""Seconds from last split (or the elapsed time if no prior split)."""
return self._length
def __repr__(self):
r = reflection.get_class_name(self, fully_qualified=False)
r += "(elapsed=%s, length=%s)" % (self._elapsed, self._length)
return r
class StopWatch(object):
"""A simple timer/stopwatch helper class.
Inspired by: apache-commons-lang java stopwatch.
Not thread-safe (when a single watch is mutated by multiple threads at
the same time). Thread-safe when used by a single thread (not shared) or
when operations are performed in a thread-safe manner on these objects by
wrapping those operations with locks.
It will try to use ``time.monotonic`` and then attempt to use the
`monotonic`_ pypi library and then fallback to using the non-monotonic
``time.time``.
.. _monotonic: https://pypi.python.org/pypi/monotonic/
"""
_STARTED = 'STARTED'
_STOPPED = 'STOPPED'
def __init__(self, duration=None):
if duration is not None and duration < 0:
raise ValueError("Duration must be greater or equal to"
" zero and not %s" % duration)
self._duration = duration
self._started_at = None
self._stopped_at = None
self._state = None
self._splits = []
def start(self):
"""Starts the watch (if not already started).
NOTE(harlowja): resets any splits previously captured (if any).
"""
if self._state == self._STARTED:
return self
self._started_at = now()
self._stopped_at = None
self._state = self._STARTED
self._splits = []
return self
@property
def splits(self):
"""Accessor to all/any splits that have been captured."""
return tuple(self._splits)
def split(self):
"""Captures a split/elapsed since start time (and doesn't stop)."""
if self._state == self._STARTED:
elapsed = self.elapsed()
if self._splits:
length = self._delta_seconds(self._splits[-1].elapsed, elapsed)
else:
length = elapsed
self._splits.append(Split(elapsed, length))
return self._splits[-1]
else:
raise RuntimeError("Can not create a split time of a stopwatch"
" if it has not been started or if it has been"
" stopped")
def restart(self):
"""Restarts the watch from a started/stopped state."""
if self._state == self._STARTED:
self.stop()
self.start()
return self
@staticmethod
def _delta_seconds(earlier, later):
# Uses max to avoid the delta/time going backwards (and thus negative).
return max(0.0, later - earlier)
def elapsed(self, maximum=None):
"""Returns how many seconds have elapsed."""
if self._state not in (self._STARTED, self._STOPPED):
raise RuntimeError("Can not get the elapsed time of a stopwatch"
" if it has not been started/stopped")
if self._state == self._STOPPED:
elapsed = self._delta_seconds(self._started_at, self._stopped_at)
else:
elapsed = self._delta_seconds(self._started_at, now())
if maximum is not None and elapsed > maximum:
elapsed = max(0.0, maximum)
return elapsed
def __enter__(self):
"""Starts the watch."""
self.start()
return self
def __exit__(self, type, value, traceback):
"""Stops the watch (ignoring errors if stop fails)."""
try:
self.stop()
except RuntimeError:
pass
def leftover(self, return_none=False):
"""Returns how many seconds are left until the watch expires.
:param return_none: when ``True`` instead of raising a ``RuntimeError``
when no duration has been set this call will
return ``None`` instead.
:type return_none: boolean
"""
if self._state != self._STARTED:
raise RuntimeError("Can not get the leftover time of a stopwatch"
" that has not been started")
if self._duration is None:
if not return_none:
raise RuntimeError("Can not get the leftover time of a watch"
" that has no duration")
return None
return max(0.0, self._duration - self.elapsed())
def expired(self):
"""Returns if the watch has expired (ie, duration provided elapsed)."""
if self._state not in (self._STARTED, self._STOPPED):
raise RuntimeError("Can not check if a stopwatch has expired"
" if it has not been started/stopped")
if self._duration is None:
return False
return self.elapsed() > self._duration
def has_started(self):
return self._state == self._STARTED
def has_stopped(self):
return self._state == self._STOPPED
def resume(self):
"""Resumes the watch from a stopped state."""
if self._state == self._STOPPED:
self._state = self._STARTED
return self
else:
raise RuntimeError("Can not resume a stopwatch that has not been"
" stopped")
def stop(self):
"""Stops the watch."""
if self._state == self._STOPPED:
return self
if self._state != self._STARTED:
raise RuntimeError("Can not stop a stopwatch that has not been"
" started")
self._stopped_at = now()
self._state = self._STOPPED
return self