Merge tag '0.3.8' into debian/unstable

Tagging 0.3.8
This commit is contained in:
Thomas Goirand 2015-07-06 09:23:57 +02:00
commit 2c15dcc6d2
4 changed files with 135 additions and 15 deletions

View File

@ -1,6 +1,33 @@
Changelog
==============
0.3.8 (2015-06-23)
------------------
- Truncate microseconds by setting to 0
[Corey Wright <corey.wright@rackspace.com>]
0.3.7 (2015-06-01)
------------------
- converting sun in range sun-thu transforms to int 0 which is
recognized as empty string; the solution was to convert sun to string "0"
0.3.6 (2015-05-29)
------------------
- Fix default behavior when no start_time given
Default value for `start_time` parameter is calculated at module init time rather than call time.
- Fix timezone support and stop depending on the system time zone
0.3.5 (2014-08-01)
------------------
- support for 'l' (last day of month)
0.3.4 (2014-01-30)
------------------

View File

@ -17,7 +17,7 @@ long_description = "\n\n".join(
setup(
name='croniter',
version='0.3.4',
version='0.3.8',
py_modules=['croniter', ],
description=(
'croniter provides iteration for datetime '

View File

@ -3,9 +3,10 @@
from __future__ import absolute_import, print_function
import re
from time import time, mktime
from time import time
import datetime
from dateutil.relativedelta import relativedelta
from dateutil.tz import tzutc
search_re = re.compile(r'^([^-]+)-([^-/]+)(/(.*))?$')
only_int_re = re.compile(r'^\d+$')
@ -32,7 +33,7 @@ class croniter(object):
ALPHACONV = (
{},
{},
{},
{"l": "l"},
{'jan': 1, 'feb': 2, 'mar': 3, 'apr': 4, 'may': 5, 'jun': 6,
'jul': 7, 'aug': 8, 'sep': 9, 'oct': 10, 'nov': 11, 'dec': 12},
{'sun': 0, 'mon': 1, 'tue': 2, 'wed': 3, 'thu': 4, 'fri': 5, 'sat': 6},
@ -51,11 +52,14 @@ class croniter(object):
bad_length = 'Exactly 5 or 6 columns has to be specified for iterator' \
'expression.'
def __init__(self, expr_format, start_time=time()):
def __init__(self, expr_format, start_time=None):
if start_time is None:
start_time = time()
self.tzinfo = None
if isinstance(start_time, datetime.datetime):
self.tzinfo = start_time.tzinfo
start_time = mktime(start_time.timetuple())
start_time = self._datetime_to_timestamp(start_time)
self.cur = start_time
self.exprs = expr_format.split()
@ -81,10 +85,10 @@ class croniter(object):
(low, high, step) = m.group(1), m.group(2), m.group(4) or 1
if not any_int_re.search(low):
low = self.ALPHACONV[i][low.lower()]
low = "{0}".format(self.ALPHACONV[i][low.lower()])
if not any_int_re.search(high):
high = self.ALPHACONV[i][high.lower()]
high = "{0}".format(self.ALPHACONV[i][high.lower()])
if (
not low or not high or int(low) > int(high)
@ -116,8 +120,11 @@ class croniter(object):
if t in self.LOWMAP[i]:
t = self.LOWMAP[i][t]
if t != '*' and (int(t) < self.RANGES[i][0] or
int(t) > self.RANGES[i][1]):
if (
t not in ["*", "l"]
and (int(t) < self.RANGES[i][0] or
int(t) > self.RANGES[i][1])
):
raise ValueError(
"[{0}] is not acceptable, out of range".format(
expr_format))
@ -138,9 +145,28 @@ class croniter(object):
def get_current(self, ret_type=float):
if ret_type == datetime.datetime:
return datetime.datetime.fromtimestamp(self.cur)
return self._timestamp_to_datetime(self.cur)
return self.cur
def _datetime_to_timestamp(self, d):
"""
Converts a `datetime` object `d` into a UNIX timestamp.
"""
if d.tzinfo is not None:
d = d.replace(tzinfo=None) - d.utcoffset()
return (d - datetime.datetime(1970, 1, 1)).total_seconds()
def _timestamp_to_datetime(self, timestamp):
"""
Converts a UNIX timestamp `timestamp` into a `datetime` object.
"""
result = datetime.datetime.utcfromtimestamp(timestamp)
if self.tzinfo:
result = result.replace(tzinfo=tzutc()).astimezone(self.tzinfo)
return result
# iterator protocol, to enable direct use of croniter
# objects in a loop, like "for dt in croniter('5 0 * * *'): ..."
# or for combining multiple croniters into single
@ -188,9 +214,8 @@ class croniter(object):
self.cur = result
if ret_type == datetime.datetime:
result = datetime.datetime.fromtimestamp(result)
if self.tzinfo:
result = self.tzinfo.localize(result)
result = self._timestamp_to_datetime(result)
return result
def _calc(self, now, expanded, is_prev):
@ -202,7 +227,7 @@ class croniter(object):
sign = 1
offset = len(expanded) == 6 and 1 or 60
dst = now = datetime.datetime.fromtimestamp(now + sign * offset)
dst = now = self._timestamp_to_datetime(now + sign * offset)
day, month, year = dst.day, dst.month, dst.year
current_year = now.year
@ -318,7 +343,7 @@ class croniter(object):
break
if next:
continue
return mktime(dst.timetuple())
return self._datetime_to_timestamp(dst.replace(microsecond=0))
raise Exception("failed to find prev date")
@ -338,6 +363,10 @@ class croniter(object):
def _get_next_nearest_diff(self, x, to_check, range_val):
for i, d in enumerate(to_check):
if d == "l":
# if 'l' then it is the last day of month
# => its value of range_val
d = range_val
if d >= x:
return d - x
return to_check[0] - x + range_val

64
src/croniter/tests/test_croniter.py Normal file → Executable file
View File

@ -3,6 +3,7 @@
import unittest
from datetime import datetime
from time import sleep
import pytz
from croniter import croniter
@ -170,6 +171,17 @@ class CroniterTest(base.TestCase):
self.assertRaises(ValueError, croniter, '* * 5-1 * *')
self.assertRaises(KeyError, croniter, '* * * janu-jun *')
def testSundayToThursdayWithAlphaConversion(self):
base = datetime(2010, 8, 25, 15, 56) #wednesday
itr = croniter("30 22 * * sun-thu", base)
next = itr.get_next(datetime)
self.assertEqual(base.year, next.year)
self.assertEqual(base.month, next.month)
self.assertEqual(base.day, next.day)
self.assertEqual(22, next.hour)
self.assertEqual(30, next.minute)
def testPrevMinute(self):
base = datetime(2010, 8, 25, 15, 56)
itr = croniter('*/1 * * * *', base)
@ -392,6 +404,58 @@ class CroniterTest(base.TestCase):
n2 = itr2.get_next(datetime)
self.assertEqual(n2.tzinfo.zone, 'Asia/Tokyo')
def testInitNoStartTime(self):
itr = croniter('* * * * *')
sleep(.01)
itr2 = croniter('* * * * *')
self.assertGreater(itr2.cur, itr.cur)
def assertScheduleTimezone(self, callback, expected_schedule):
for expected_date, expected_offset in expected_schedule:
d = callback()
self.assertEqual(expected_date, d.replace(tzinfo=None))
self.assertEqual(expected_offset, d.utcoffset().total_seconds())
def testTimezoneWinterTime(self):
tz = pytz.timezone('Europe/Athens')
expected_schedule = [
(datetime(2013, 10, 27, 2, 30, 0), 10800),
(datetime(2013, 10, 27, 3, 0, 0), 10800),
(datetime(2013, 10, 27, 3, 30, 0), 10800),
(datetime(2013, 10, 27, 3, 0, 0), 7200),
(datetime(2013, 10, 27, 3, 30, 0), 7200),
(datetime(2013, 10, 27, 4, 0, 0), 7200),
(datetime(2013, 10, 27, 4, 30, 0), 7200),
]
start = datetime(2013, 10, 27, 2, 0, 0)
ct = croniter('*/30 * * * *', tz.localize(start))
self.assertScheduleTimezone(lambda: ct.get_next(datetime), expected_schedule)
start = datetime(2013, 10, 27, 5, 0, 0)
ct = croniter('*/30 * * * *', tz.localize(start))
self.assertScheduleTimezone(lambda: ct.get_prev(datetime), reversed(expected_schedule))
def testTimezoneSummerTime(self):
tz = pytz.timezone('Europe/Athens')
expected_schedule = [
(datetime(2013, 3, 31, 1, 30, 0), 7200),
(datetime(2013, 3, 31, 2, 0, 0), 7200),
(datetime(2013, 3, 31, 2, 30, 0), 7200),
(datetime(2013, 3, 31, 4, 0, 0), 10800),
(datetime(2013, 3, 31, 4, 30, 0), 10800),
]
start = datetime(2013, 3, 31, 1, 0, 0)
ct = croniter('*/30 * * * *', tz.localize(start))
self.assertScheduleTimezone(lambda: ct.get_next(datetime), expected_schedule)
start = datetime(2013, 3, 31, 5, 0, 0)
ct = croniter('*/30 * * * *', tz.localize(start))
self.assertScheduleTimezone(lambda: ct.get_prev(datetime), reversed(expected_schedule))
if __name__ == '__main__':
unittest.main()