reading works and integration end-to-end tests in place

This commit is contained in:
Sandy Walsh 2014-05-14 20:59:48 +00:00
parent 2bb11b0d31
commit 12878c8ce3
7 changed files with 165 additions and 73 deletions

View File

@ -50,6 +50,12 @@ class ArchiveReader(Archive):
"""
def __init__(self, filename):
super(ArchiveReader, self).__init__(filename)
self._open_file(filename)
def _open_file(self, filename):
# Broken out for testing.
self._handle = open(filename, "rb")
def read(self):
pass
# (metadata, payload)
return disk_storage.unpack_notification(self._handle)

View File

@ -12,6 +12,10 @@ class OutOfSync(Exception):
pass
class EndOfFile(Exception):
pass
BOR_MAGIC_NUMBER = 0x69867884
@ -27,10 +31,14 @@ class Version0(object):
def make_preamble(self, version):
return struct.pack(self.preamble_schema, BOR_MAGIC_NUMBER, version)
def _check_eof(self, expected, actual):
if actual < expected:
raise EndOfFile()
def load_preamble(self, file_handle):
raw = file_handle.read(self.preamble_size)
self._check_eof(self.preamble_size, len(raw))
header = struct.unpack(self.preamble_schema, raw)
print "raw", raw
if header[0] != BOR_MAGIC_NUMBER:
raise OutOfSync("Expected Beginning of Record marker")
return header[1]
@ -106,12 +114,14 @@ class Version1(Version0):
def unpack(self, file_handle):
header_bytes = file_handle.read(self.header_size)
self._check_eof(self.header_size, len(header_bytes))
header = struct.unpack(self.header_schema, header_bytes)
if header[2] != 0:
raise OutOfSync("Didn't find 0 EOR marker.")
metadata_bytes = file_handle.read(header[0])
self._check_eof(header[0], len(metadata_bytes))
num_strings = struct.unpack_from("i", metadata_bytes)
offset = struct.calcsize("i")
lengths = num_strings[0] / 2
@ -127,12 +137,11 @@ class Version1(Version0):
for n in range(len(key_values))[::2])
raw = file_handle.read(header[1])
self._check_eof(header[1], len(raw))
raw_len = struct.unpack_from("i", raw)
offset = struct.calcsize("i")
raw_json = struct.unpack_from("%ds" % raw_len[0], raw, offset=offset)
notification = json.loads(raw_json[0])
return (metadata, notification)
jnot = struct.unpack_from("%ds" % raw_len[0], raw, offset=offset)
return (metadata, jnot[0])
VERSIONS = {1: Version1()}

View File

@ -13,12 +13,23 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import fnmatch
import os
import os.path
import archive
import disk_storage
import utils
class NoMoreFiles(Exception):
pass
class NoValidFile(Exception):
pass
class ArchiveCallback(object):
def on_open(self, filename):
"""Called when an Archive is opened."""
@ -30,35 +41,15 @@ class ArchiveCallback(object):
class RollManager(object):
def __init__(self, filename_template, roll_checker, directory=".",
def __init__(self, filename_template, directory=".",
archive_class=None, archive_callback=None):
self.filename_template = filename_template
self.roll_checker = roll_checker
self.directory = directory
self.active_archive = None
self.archive_class = archive_class
self.active_filename = None
self.archive_callback = archive_callback
def _make_filename(self):
f = utils.now().strftime(self.filename_template)
f = f.replace(" ", "_")
f = f.replace("/", "_")
return os.path.join(self.directory, f)
def get_active_archive(self):
if not self.active_archive:
self.active_filename = self._make_filename()
self.active_archive = self.archive_class(self.active_filename)
if self.archive_callback:
self.archive_callback.on_open(self.active_filename)
self.roll_checker.start(self.active_archive)
return self.active_archive
def _should_roll_archive(self):
return self.roll_checker.check(self.active_archive)
def _roll_archive(self):
self.close()
self.get_active_archive()
@ -73,17 +64,42 @@ class RollManager(object):
class ReadingRollManager(RollManager):
def __init__(self, filename_template, roll_checker, directory=".",
def __init__(self, filename_template, directory=".",
archive_class = archive.ArchiveReader,
archive_callback=None):
super(ReadingRollManager, self).__init__(filename_template,
roll_checker,
directory=directory,
archive_callback=event_callback,
archive_class=archive_class)
super(ReadingRollManager, self).__init__(
filename_template,
directory=directory,
archive_callback=archive_callback,
archive_class=archive_class)
self.files_to_read = self._get_matching_files(directory,
filename_template)
def _get_matching_files(self, directory, filename_template):
files = [os.path.join(directory, f)
for f in os.listdir(self.directory)
if os.path.isfile(os.path.join(directory, f))]
return sorted(fnmatch.filter(files, filename_template))
def read(self):
pass
# (metadata, payload)
for x in range(3): # 3 attempts to file a valid file.
a = self.get_active_archive()
try:
return a.read()
except disk_storage.EndOfFile:
self._roll_archive()
raise NoValidFile("Unable to find a valid file after 3 attempts")
def get_active_archive(self):
if not self.active_archive:
if not self.files_to_read:
raise NoMoreFiles()
self.active_filename = self.files_to_read.pop(0)
self.active_archive = self.archive_class(self.active_filename)
if self.archive_callback:
self.archive_callback.on_open(self.active_filename)
return self.active_archive
class WritingRollManager(RollManager):
@ -92,10 +108,10 @@ class WritingRollManager(RollManager):
archive_callback=None):
super(WritingRollManager, self).__init__(
filename_template,
roll_checker,
directory=directory,
archive_callback=archive_callback,
archive_class=archive_class)
self.roll_checker = roll_checker
def write(self, metadata, payload):
"""metadata is string:string dict.
@ -105,3 +121,22 @@ class WritingRollManager(RollManager):
a.write(metadata, payload)
if self._should_roll_archive():
self._roll_archive()
def _make_filename(self):
f = utils.now().strftime(self.filename_template)
f = f.replace(" ", "_")
f = f.replace("/", "_")
f = f.replace(":", "_")
return os.path.join(self.directory, f)
def get_active_archive(self):
if not self.active_archive:
self.active_filename = self._make_filename()
self.active_archive = self.archive_class(self.active_filename)
if self.archive_callback:
self.archive_callback.on_open(self.active_filename)
self.roll_checker.start(self.active_archive)
return self.active_archive
def _should_roll_archive(self):
return self.roll_checker.check(self.active_archive)

View File

@ -14,6 +14,7 @@
# limitations under the License.
import calendar
import collections
import datetime
import decimal
import json
@ -48,3 +49,17 @@ class DateTimeEncoder(json.JSONEncoder):
obj = obj - obj.utcoffset()
return str(dt_to_decimal(obj))
return super(DateTimeEncoder, self).default(obj)
# This is a hack for comparing structures load'ed from json
# (which are always unicode) back to strings. It's used
# for assertEqual() in the tests and is very slow and expensive.
def unicode_to_string(data):
if isinstance(data, basestring):
return str(data)
elif isinstance(data, collections.Mapping):
return dict(map(unicode_to_string, data.iteritems()))
elif isinstance(data, collections.Iterable):
return type(data)(map(unicode_to_string, data))
else:
return data

View File

@ -18,16 +18,30 @@ TEMPDIR = "test_temp"
class ArchiveCallback(object):
def __init__(self, active_files):
self.active_files = active_files
def __init__(self):
self.active_files = {}
self.ordered_files = []
def on_open(self, filename):
self.active_files.add(filename)
print "Opened:", filename
self.active_files[filename] = True
self.ordered_files.append(filename)
def on_close(self, filename):
self.active_files.remove(filename)
print "Closed:", filename
self.active_files[filename] = False
class VerifyArchiveCallback(object):
def __init__(self, original_files):
self.original_files = original_files
def on_open(self, filename):
o = self.original_files.pop(0)
if filename != o:
raise Exception("Wrong order: Got %s, Expected %s" %
(filename, o))
def on_close(self, filename):
pass
class TestSizeRolling(unittest.TestCase):
@ -40,11 +54,10 @@ class TestSizeRolling(unittest.TestCase):
pass
def test_size_rolling(self):
active_files = set()
callback = ArchiveCallback(active_files)
callback = ArchiveCallback()
checker = roll_checker.SizeRollChecker(1)
manager = roll_manager.WritingRollManager("test_%Y_%m_%d_%f.events",
manager = roll_manager.WritingRollManager("test_%Y_%m_%d_%X_%f.events",
checker,
TEMPDIR,
archive_callback=callback)
@ -69,5 +82,22 @@ class TestSizeRolling(unittest.TestCase):
now = g.move_to_next_tick(now)
manager.close()
raise Exception("Boom")
for filename, is_open in callback.active_files.items():
self.assertFalse(is_open)
vcallback = VerifyArchiveCallback(callback.ordered_files)
manager = roll_manager.ReadingRollManager("test_*.events",
TEMPDIR,
archive_callback=vcallback)
while True:
try:
# By comparing the json'ed version of
# the payloads we avoid all the issues
# with unicode and datetime->decimal conversions.
metadata, jpayload = manager.read()
orig_metadata, orig_jpayload = entries.pop(0)
self.assertEqual(orig_metadata, metadata)
self.assertEqual(orig_jpayload, jpayload)
except roll_manager.NoMoreFiles:
break

View File

@ -63,7 +63,8 @@ class TestVersion1(unittest.TestCase):
file_handle = mock.Mock()
file_handle.read.side_effect = blocks
m, p = self.v1.unpack(file_handle)
m, jp = self.v1.unpack(file_handle)
p = json.loads(jp)
self.assertEqual(metadata, m)
self.assertEqual(payload, p)

View File

@ -19,33 +19,9 @@ class FakeArchive(object):
class TestRollManager(unittest.TestCase):
def test_make_filename(self):
now = datetime.datetime(day=1, month=2, year=2014,
hour=10, minute=11, second=12)
x = roll_manager.RollManager("filename_%c.dat", None)
with mock.patch.object(utils, "now") as dt:
dt.return_value = now
filename = x._make_filename()
self.assertEqual(filename,
"./filename_Sat_Feb__1_10:11:12_2014.dat")
def test_get_active_archive(self):
checker = mock.Mock()
callback = mock.Mock()
filename_template = "filename_%c.dat"
x = roll_manager.RollManager(filename_template, checker,
archive_callback=callback,
archive_class=FakeArchive)
with mock.patch("shoebox.archive.ArchiveWriter._open_file") as of:
arc = x.get_active_archive()
self.assertTrue(checker.start.called)
self.assertTrue(callback.on_open.called)
def test_close(self):
callback = mock.Mock()
checker = mock.Mock()
x = roll_manager.RollManager("template", checker,
x = roll_manager.RollManager("template",
archive_callback=callback)
x.active_archive = mock.Mock()
x.active_filename = "foo"
@ -74,13 +50,33 @@ class TestWritingRollManager(unittest.TestCase):
x.write({}, "payload")
self.assertFalse(ra.called)
def test_correct_archiver(self):
x = roll_manager.WritingRollManager("foo", None)
print x.archive_class
self.assertEqual(x.archive_class, archive.ArchiveWriter)
def test_get_active_archive(self):
checker = mock.Mock()
callback = mock.Mock()
filename_template = "filename_%c.dat"
x = roll_manager.WritingRollManager(filename_template, checker)
x = roll_manager.WritingRollManager(filename_template, checker,
archive_callback=callback,
archive_class=FakeArchive)
with mock.patch("shoebox.archive.ArchiveWriter._open_file") as of:
arc = x.get_active_archive()
self.assertTrue(isinstance(arc, archive.ArchiveWriter))
self.assertTrue(checker.start.called)
self.assertTrue(callback.on_open.called)
def test_make_filename(self):
now = datetime.datetime(day=1, month=2, year=2014,
hour=10, minute=11, second=12)
x = roll_manager.WritingRollManager("filename_%c.dat", None)
with mock.patch.object(utils, "now") as dt:
dt.return_value = now
filename = x._make_filename()
self.assertEqual(filename,
"./filename_Sat_Feb__1_10_11_12_2014.dat")
class TestWriting(unittest.TestCase):