diff --git a/shoebox/roll_manager.py b/shoebox/roll_manager.py index 598c566..cc59539 100644 --- a/shoebox/roll_manager.py +++ b/shoebox/roll_manager.py @@ -153,7 +153,13 @@ class WritingJSONRollManager(object): self.directory = kwargs.get('directory', '.') self.destination_directory = kwargs.get('destination_directory', '.') self.roll_size_mb = int(kwargs.get('roll_size_mb', 1000)) - self.check_delay = 0 # Only check directory size every N events. + + # Read the directory at the start, but incrementally + # update the size as we write more files. Doing a full + # disk stat every time is way too expensive. + # Of course, this means we are not accounting for + # block size. + self.directory_size = self._get_directory_size() def _make_filename(self, crc, prefix): now = notification_utils.now() @@ -167,19 +173,22 @@ class WritingJSONRollManager(object): return os.path.join(prefix, f) def _should_tar(self): + return (self.directory_size / 1048576) >= self.roll_size_mb + + def _get_directory_size(self): size = 0 for f in os.listdir(self.directory): full = os.path.join(self.directory, f) if os.path.isfile(full): size += os.path.getsize(full) - - return (size / 1048576) >= self.roll_size_mb + return size def _tar_directory(self): # tar all the files in working directory into an archive # in destination_directory. crc = "archive" - filename = self._make_filename(crc, self.destination_directory) + ".tar.gz" + filename = self._make_filename(crc, self.destination_directory) + \ + ".tar.gz" # No contextmgr for tarfile in 2.6 :( tar = tarfile.open(filename, "w:gz") @@ -194,13 +203,7 @@ class WritingJSONRollManager(object): full = os.path.join(self.directory, f) if os.path.isfile(full): os.remove(full) - - def _delay_check(self): - self.check_delay += 1 - if self.check_delay > 250: - self.check_delay = 0 - return False - return True + self.directory_size = self._get_directory_size() def write(self, metadata, json_payload): # Metadata is ignored. @@ -209,8 +212,7 @@ class WritingJSONRollManager(object): with open(filename, "w") as f: f.write(json_payload) - if self._delay_check(): - return + self.directory_size += len(json_payload) if not self._should_tar(): return @@ -218,6 +220,5 @@ class WritingJSONRollManager(object): self._tar_directory() self._clean_working_directory() - def close(self): pass diff --git a/test/integration/test_json_tarball.py b/test/integration/test_json_tarball.py new file mode 100644 index 0000000..0f25487 --- /dev/null +++ b/test/integration/test_json_tarball.py @@ -0,0 +1,83 @@ +import datetime +import hashlib +import json +import mock +import os +import shutil +import tarfile +import unittest + +import notification_utils +import notigen + +from shoebox import roll_manager + + +TEMPDIR = "test_temp" +DESTDIR = "test_temp/output" + + +class TestDirectory(unittest.TestCase): + def setUp(self): + shutil.rmtree(TEMPDIR, ignore_errors=True) + shutil.rmtree(DESTDIR, ignore_errors=True) + os.mkdir(TEMPDIR) + os.mkdir(DESTDIR) + + def test_size_rolling(self): + manager = roll_manager.WritingJSONRollManager( + "%Y_%m_%d_%X_%f_[[CRC]].event", + directory=TEMPDIR, + destination_directory=DESTDIR, + roll_size_mb=10) + + g = notigen.EventGenerator("test/integration/templates") + entries = {} + now = datetime.datetime.utcnow() + while len(entries) < 10000: + events = g.generate(now) + if events: + for event in events: + metadata = {} + json_event = json.dumps(event, + cls=notification_utils.DateTimeEncoder) + manager.write(metadata, json_event) + crc = hashlib.sha256(json_event).hexdigest() + entries[crc] = json_event + + now = g.move_to_next_tick(now) + manager.close() + + # Confirm files and tarballs ... + print "Starting entries:", len(entries) + date_len = len("2015_02_24_14_15_58_037080_") + num = 0 + for f in os.listdir(TEMPDIR): + full = os.path.join(TEMPDIR, f) + if os.path.isfile(full): + crc = f[date_len:-len(".event")] + del entries[crc] + num += 1 + print "Untarred entries:", num, "Remaining:", len(entries) + + # the rest have to be in tarballs ... + for f in os.listdir(DESTDIR): + num = 0 + actual = 0 + tar = tarfile.open(os.path.join(DESTDIR, f), "r:gz") + for tarinfo in tar: + crc = tarinfo.name[len(TEMPDIR) + 1 + date_len:-len(".event")] + actual += 1 + if crc: + del entries[crc] + num += 1 + + if actual == 1: + raise Exception("tarball has 1 entry. Something is wrong.") + + print "In %s: %d of %d Remaining: %d" % (f, num, actual, + len(entries)) + tar.close() + + if len(entries): + raise Exception("%d more events than generated." % len(entries)) diff --git a/test/test_roll_manager.py b/test/test_roll_manager.py index 76feced..0b376c4 100644 --- a/test/test_roll_manager.py +++ b/test/test_roll_manager.py @@ -95,7 +95,10 @@ class TestWriting(unittest.TestCase): class TestJSONRollManager(unittest.TestCase): - def test_make_filename(self): + @mock.patch( + "shoebox.roll_manager.WritingJSONRollManager._get_directory_size") + def test_make_filename(self, gds): + gds.return_value = 1000 now = datetime.datetime(day=1, month=2, year=2014, hour=10, minute=11, second=12) with mock.patch.object(notification_utils, "now") as dt: @@ -110,20 +113,34 @@ class TestJSONRollManager(unittest.TestCase): @mock.patch('os.path.getsize') @mock.patch('os.listdir') @mock.patch('os.path.isfile') - def test_should_tar(self, isf, ld, gs): + def test_get_directory_size(self, isf, ld, gs): rm = roll_manager.WritingJSONRollManager("template.foo") gs.return_value = 250000 ld.return_value = ['a', 'b', 'c'] isf.return_value = True - rm.roll_size_mb = 1 - self.assertFalse(rm._should_tar()) + self.assertEqual(250000*3, rm._get_directory_size()) ld.return_value = ['a', 'b', 'c', 'd', 'e', 'f'] + self.assertEqual(250000*6, rm._get_directory_size()) + + @mock.patch( + "shoebox.roll_manager.WritingJSONRollManager._get_directory_size") + def test_should_tar(self, gds): + gds.return_value = 1000 + rm = roll_manager.WritingJSONRollManager("template.foo") + rm.directory_size = 9 * 1048576 + rm.roll_size_mb = 10 + self.assertFalse(rm._should_tar()) + rm.directory_size = 10 * 1048576 + rm.roll_size_mb = 10 self.assertTrue(rm._should_tar()) @mock.patch('os.listdir') @mock.patch('os.remove') @mock.patch('os.path.isfile') - def test_clean_working_directory(self, isf, rem, ld): + @mock.patch( + "shoebox.roll_manager.WritingJSONRollManager._get_directory_size") + def test_clean_working_directory(self, gds, isf, rem, ld): + gds.return_value = 1000 isf.return_value = True rm = roll_manager.WritingJSONRollManager("template.foo") ld.return_value = ['a', 'b', 'c'] @@ -133,9 +150,13 @@ class TestJSONRollManager(unittest.TestCase): @mock.patch('os.listdir') @mock.patch('tarfile.open') @mock.patch('os.path.isfile') - def test_tar_directory(self, isf, to, ld): + @mock.patch( + "shoebox.roll_manager.WritingJSONRollManager._get_directory_size") + def test_tar_directory(self, gds, isf, to, ld): + gds.return_value = 1000 ld.return_value = ['a', 'b', 'c'] isf.return_value = True + gds = 1000 rm = roll_manager.WritingJSONRollManager("template.foo") open_name = '%s.open' % roll_manager.__name__ @@ -145,7 +166,10 @@ class TestJSONRollManager(unittest.TestCase): rm._tar_directory() self.assertTrue(to.called) - def test_write(self): + @mock.patch( + "shoebox.roll_manager.WritingJSONRollManager._get_directory_size") + def test_write(self, gds): + gds.return_value = 0 rm = roll_manager.WritingJSONRollManager("template.foo") payload = "some big payload" open_name = '%s.open' % roll_manager.__name__ @@ -158,3 +182,4 @@ class TestJSONRollManager(unittest.TestCase): self.assertTrue(mock_open.called_once_with( "template.foo", "wb")) self.assertFalse(td.called) + self.assertEqual(rm.directory_size, len(payload))