diff --git a/shoebox/archive.py b/shoebox/archive.py index 2e73cd3..d95c37f 100644 --- a/shoebox/archive.py +++ b/shoebox/archive.py @@ -50,6 +50,12 @@ class ArchiveReader(Archive): """ def __init__(self, filename): super(ArchiveReader, self).__init__(filename) + self._open_file(filename) + + def _open_file(self, filename): + # Broken out for testing. + self._handle = open(filename, "rb") def read(self): - pass + # (metadata, payload) + return disk_storage.unpack_notification(self._handle) diff --git a/shoebox/disk_storage.py b/shoebox/disk_storage.py index 6497f68..07dd0c7 100644 --- a/shoebox/disk_storage.py +++ b/shoebox/disk_storage.py @@ -12,6 +12,10 @@ class OutOfSync(Exception): pass +class EndOfFile(Exception): + pass + + BOR_MAGIC_NUMBER = 0x69867884 @@ -27,10 +31,14 @@ class Version0(object): def make_preamble(self, version): return struct.pack(self.preamble_schema, BOR_MAGIC_NUMBER, version) + def _check_eof(self, expected, actual): + if actual < expected: + raise EndOfFile() + def load_preamble(self, file_handle): raw = file_handle.read(self.preamble_size) + self._check_eof(self.preamble_size, len(raw)) header = struct.unpack(self.preamble_schema, raw) - print "raw", raw if header[0] != BOR_MAGIC_NUMBER: raise OutOfSync("Expected Beginning of Record marker") return header[1] @@ -106,12 +114,14 @@ class Version1(Version0): def unpack(self, file_handle): header_bytes = file_handle.read(self.header_size) + self._check_eof(self.header_size, len(header_bytes)) header = struct.unpack(self.header_schema, header_bytes) if header[2] != 0: raise OutOfSync("Didn't find 0 EOR marker.") metadata_bytes = file_handle.read(header[0]) + self._check_eof(header[0], len(metadata_bytes)) num_strings = struct.unpack_from("i", metadata_bytes) offset = struct.calcsize("i") lengths = num_strings[0] / 2 @@ -127,12 +137,11 @@ class Version1(Version0): for n in range(len(key_values))[::2]) raw = file_handle.read(header[1]) + self._check_eof(header[1], len(raw)) raw_len = struct.unpack_from("i", raw) offset = struct.calcsize("i") - raw_json = struct.unpack_from("%ds" % raw_len[0], raw, offset=offset) - notification = json.loads(raw_json[0]) - - return (metadata, notification) + jnot = struct.unpack_from("%ds" % raw_len[0], raw, offset=offset) + return (metadata, jnot[0]) VERSIONS = {1: Version1()} diff --git a/shoebox/roll_manager.py b/shoebox/roll_manager.py index befb0a0..3f7eae2 100644 --- a/shoebox/roll_manager.py +++ b/shoebox/roll_manager.py @@ -13,12 +13,23 @@ # See the License for the specific language governing permissions and # limitations under the License. +import fnmatch +import os import os.path import archive +import disk_storage import utils +class NoMoreFiles(Exception): + pass + + +class NoValidFile(Exception): + pass + + class ArchiveCallback(object): def on_open(self, filename): """Called when an Archive is opened.""" @@ -30,35 +41,15 @@ class ArchiveCallback(object): class RollManager(object): - def __init__(self, filename_template, roll_checker, directory=".", + def __init__(self, filename_template, directory=".", archive_class=None, archive_callback=None): self.filename_template = filename_template - self.roll_checker = roll_checker self.directory = directory self.active_archive = None self.archive_class = archive_class self.active_filename = None self.archive_callback = archive_callback - def _make_filename(self): - f = utils.now().strftime(self.filename_template) - f = f.replace(" ", "_") - f = f.replace("/", "_") - return os.path.join(self.directory, f) - - def get_active_archive(self): - if not self.active_archive: - self.active_filename = self._make_filename() - self.active_archive = self.archive_class(self.active_filename) - if self.archive_callback: - self.archive_callback.on_open(self.active_filename) - self.roll_checker.start(self.active_archive) - - return self.active_archive - - def _should_roll_archive(self): - return self.roll_checker.check(self.active_archive) - def _roll_archive(self): self.close() self.get_active_archive() @@ -73,17 +64,42 @@ class RollManager(object): class ReadingRollManager(RollManager): - def __init__(self, filename_template, roll_checker, directory=".", + def __init__(self, filename_template, directory=".", archive_class = archive.ArchiveReader, archive_callback=None): - super(ReadingRollManager, self).__init__(filename_template, - roll_checker, - directory=directory, - archive_callback=event_callback, - archive_class=archive_class) + super(ReadingRollManager, self).__init__( + filename_template, + directory=directory, + archive_callback=archive_callback, + archive_class=archive_class) + self.files_to_read = self._get_matching_files(directory, + filename_template) + + def _get_matching_files(self, directory, filename_template): + files = [os.path.join(directory, f) + for f in os.listdir(self.directory) + if os.path.isfile(os.path.join(directory, f))] + return sorted(fnmatch.filter(files, filename_template)) def read(self): - pass + # (metadata, payload) + for x in range(3): # 3 attempts to file a valid file. + a = self.get_active_archive() + try: + return a.read() + except disk_storage.EndOfFile: + self._roll_archive() + raise NoValidFile("Unable to find a valid file after 3 attempts") + + def get_active_archive(self): + if not self.active_archive: + if not self.files_to_read: + raise NoMoreFiles() + self.active_filename = self.files_to_read.pop(0) + self.active_archive = self.archive_class(self.active_filename) + if self.archive_callback: + self.archive_callback.on_open(self.active_filename) + return self.active_archive class WritingRollManager(RollManager): @@ -92,10 +108,10 @@ class WritingRollManager(RollManager): archive_callback=None): super(WritingRollManager, self).__init__( filename_template, - roll_checker, directory=directory, archive_callback=archive_callback, archive_class=archive_class) + self.roll_checker = roll_checker def write(self, metadata, payload): """metadata is string:string dict. @@ -105,3 +121,22 @@ class WritingRollManager(RollManager): a.write(metadata, payload) if self._should_roll_archive(): self._roll_archive() + + def _make_filename(self): + f = utils.now().strftime(self.filename_template) + f = f.replace(" ", "_") + f = f.replace("/", "_") + f = f.replace(":", "_") + return os.path.join(self.directory, f) + + def get_active_archive(self): + if not self.active_archive: + self.active_filename = self._make_filename() + self.active_archive = self.archive_class(self.active_filename) + if self.archive_callback: + self.archive_callback.on_open(self.active_filename) + self.roll_checker.start(self.active_archive) + return self.active_archive + + def _should_roll_archive(self): + return self.roll_checker.check(self.active_archive) diff --git a/shoebox/utils.py b/shoebox/utils.py index 59220ce..862ce44 100644 --- a/shoebox/utils.py +++ b/shoebox/utils.py @@ -14,6 +14,7 @@ # limitations under the License. import calendar +import collections import datetime import decimal import json @@ -48,3 +49,17 @@ class DateTimeEncoder(json.JSONEncoder): obj = obj - obj.utcoffset() return str(dt_to_decimal(obj)) return super(DateTimeEncoder, self).default(obj) + + +# This is a hack for comparing structures load'ed from json +# (which are always unicode) back to strings. It's used +# for assertEqual() in the tests and is very slow and expensive. +def unicode_to_string(data): + if isinstance(data, basestring): + return str(data) + elif isinstance(data, collections.Mapping): + return dict(map(unicode_to_string, data.iteritems())) + elif isinstance(data, collections.Iterable): + return type(data)(map(unicode_to_string, data)) + else: + return data diff --git a/test/integration/test_rolling.py b/test/integration/test_rolling.py index 260bf47..7144b33 100644 --- a/test/integration/test_rolling.py +++ b/test/integration/test_rolling.py @@ -18,16 +18,30 @@ TEMPDIR = "test_temp" class ArchiveCallback(object): - def __init__(self, active_files): - self.active_files = active_files + def __init__(self): + self.active_files = {} + self.ordered_files = [] def on_open(self, filename): - self.active_files.add(filename) - print "Opened:", filename + self.active_files[filename] = True + self.ordered_files.append(filename) def on_close(self, filename): - self.active_files.remove(filename) - print "Closed:", filename + self.active_files[filename] = False + + +class VerifyArchiveCallback(object): + def __init__(self, original_files): + self.original_files = original_files + + def on_open(self, filename): + o = self.original_files.pop(0) + if filename != o: + raise Exception("Wrong order: Got %s, Expected %s" % + (filename, o)) + + def on_close(self, filename): + pass class TestSizeRolling(unittest.TestCase): @@ -40,11 +54,10 @@ class TestSizeRolling(unittest.TestCase): pass def test_size_rolling(self): - active_files = set() - callback = ArchiveCallback(active_files) + callback = ArchiveCallback() checker = roll_checker.SizeRollChecker(1) - manager = roll_manager.WritingRollManager("test_%Y_%m_%d_%f.events", + manager = roll_manager.WritingRollManager("test_%Y_%m_%d_%X_%f.events", checker, TEMPDIR, archive_callback=callback) @@ -69,5 +82,22 @@ class TestSizeRolling(unittest.TestCase): now = g.move_to_next_tick(now) manager.close() - raise Exception("Boom") + for filename, is_open in callback.active_files.items(): + self.assertFalse(is_open) + vcallback = VerifyArchiveCallback(callback.ordered_files) + manager = roll_manager.ReadingRollManager("test_*.events", + TEMPDIR, + archive_callback=vcallback) + + while True: + try: + # By comparing the json'ed version of + # the payloads we avoid all the issues + # with unicode and datetime->decimal conversions. + metadata, jpayload = manager.read() + orig_metadata, orig_jpayload = entries.pop(0) + self.assertEqual(orig_metadata, metadata) + self.assertEqual(orig_jpayload, jpayload) + except roll_manager.NoMoreFiles: + break diff --git a/test/test_disk_storage.py b/test/test_disk_storage.py index cdcb7a0..5083f15 100644 --- a/test/test_disk_storage.py +++ b/test/test_disk_storage.py @@ -63,7 +63,8 @@ class TestVersion1(unittest.TestCase): file_handle = mock.Mock() file_handle.read.side_effect = blocks - m, p = self.v1.unpack(file_handle) + m, jp = self.v1.unpack(file_handle) + p = json.loads(jp) self.assertEqual(metadata, m) self.assertEqual(payload, p) diff --git a/test/test_roll_manager.py b/test/test_roll_manager.py index d9806a5..a62ee4e 100644 --- a/test/test_roll_manager.py +++ b/test/test_roll_manager.py @@ -19,33 +19,9 @@ class FakeArchive(object): class TestRollManager(unittest.TestCase): - def test_make_filename(self): - now = datetime.datetime(day=1, month=2, year=2014, - hour=10, minute=11, second=12) - x = roll_manager.RollManager("filename_%c.dat", None) - - with mock.patch.object(utils, "now") as dt: - dt.return_value = now - filename = x._make_filename() - self.assertEqual(filename, - "./filename_Sat_Feb__1_10:11:12_2014.dat") - - def test_get_active_archive(self): - checker = mock.Mock() - callback = mock.Mock() - filename_template = "filename_%c.dat" - x = roll_manager.RollManager(filename_template, checker, - archive_callback=callback, - archive_class=FakeArchive) - with mock.patch("shoebox.archive.ArchiveWriter._open_file") as of: - arc = x.get_active_archive() - self.assertTrue(checker.start.called) - self.assertTrue(callback.on_open.called) - def test_close(self): callback = mock.Mock() - checker = mock.Mock() - x = roll_manager.RollManager("template", checker, + x = roll_manager.RollManager("template", archive_callback=callback) x.active_archive = mock.Mock() x.active_filename = "foo" @@ -74,13 +50,33 @@ class TestWritingRollManager(unittest.TestCase): x.write({}, "payload") self.assertFalse(ra.called) + def test_correct_archiver(self): + x = roll_manager.WritingRollManager("foo", None) + print x.archive_class + self.assertEqual(x.archive_class, archive.ArchiveWriter) + def test_get_active_archive(self): checker = mock.Mock() + callback = mock.Mock() filename_template = "filename_%c.dat" - x = roll_manager.WritingRollManager(filename_template, checker) + x = roll_manager.WritingRollManager(filename_template, checker, + archive_callback=callback, + archive_class=FakeArchive) with mock.patch("shoebox.archive.ArchiveWriter._open_file") as of: arc = x.get_active_archive() - self.assertTrue(isinstance(arc, archive.ArchiveWriter)) + self.assertTrue(checker.start.called) + self.assertTrue(callback.on_open.called) + + def test_make_filename(self): + now = datetime.datetime(day=1, month=2, year=2014, + hour=10, minute=11, second=12) + x = roll_manager.WritingRollManager("filename_%c.dat", None) + + with mock.patch.object(utils, "now") as dt: + dt.return_value = now + filename = x._make_filename() + self.assertEqual(filename, + "./filename_Sat_Feb__1_10_11_12_2014.dat") class TestWriting(unittest.TestCase):