split out shoebox into smaller files

This commit is contained in:
Sandy Walsh 2014-05-13 01:42:46 +00:00
parent 72a603e58b
commit 2dee06e43d
7 changed files with 206 additions and 293 deletions

View File

@ -3,11 +3,16 @@ shoebox
binary data archiving library - supports uploading to object storage
Requires PyTables, which needs (on ubuntu):
There are ArchiveReaders and ArchiveWriters which are managed
by RollManager. The RollManager opens and closes Archivers as
needed. "As needed" is determined by which RollChecker that was
passed into the RollManager. Archive files can roll over based
on file size or elapsed time (for writing). For reading, archive
files are only rolled over when the EOF is reached.
sudo apt-get install libhdf5-serial-dev
sudo pip install numpy
sudo pip install numexpr
sudo pip install cython
sudo pip install tables
Roll Managers also take care of filename creation, compression
of completed archives and transfer of archive files to remote
storage locations.
TODO: How will the ReadingRollManager know which files to read
from, and in which order, if the filename is templated?

50
shoebox/archive.py Normal file
View File

@ -0,0 +1,50 @@
# Copyright (c) 2014 Dark Secret Software Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
class Archive(object):
def __init__(self, filename):
self._handle = None
self.filename = filename
def get_file_handle(self):
return self._handle
class ArchiveWriter(Archive):
"""The active Archive for appending.
"""
def __init__(self, filename):
super(ArchiveWriter, self).__init__(filename)
self._handle = open(filename, "wb+")
def write(self, payload):
pass
class ArchiveReader(Archive):
"""The active Archive for consuming.
"""
def __init__(self, filename):
super(ArchiveReader, self).__init__(filename)
def read_block(self):
pass
def read_header(self):
pass
def read_payload(self):
pass

52
shoebox/roll_checker.py Normal file
View File

@ -0,0 +1,52 @@
# Copyright (c) 2014 Dark Secret Software Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import utils
class RollChecker(object):
def start(self, archive):
"""Called when a new archive is selected."""
pass
def check(self, archive):
"""Should the current archive roll?"""
pass
class NeverRollChecker(RollChecker):
def check(self, archive):
return False
class TimeRollChecker(RollChecker):
def __init__(self, timedelta):
self.timedelta = timedelta
def start(self, archive):
self.start_time = utils.now()
self.end_time = self.start_time + self.timedelta
def check(self, archive):
return utils.now() >= self.end_time
class SizeRollChecker(RollChecker):
def __init__(self, size_in_gb):
self.size_in_gb = size_in_gb
def check(self, archive):
size = archive._get_file_handle().tell()
return size / 1073741824 >= self.size_in_gb

72
shoebox/roll_manager.py Normal file
View File

@ -0,0 +1,72 @@
# Copyright (c) 2014 Dark Secret Software Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import archive
import utils
class RollManager(object):
def __init__(self, filename_template, roll_checker, directory="."):
self.filename_template = filename_template
self.roll_checker = roll_checker
self.directory = directory
self.active_archive = None
def _make_filename(self):
f = utils.now().strftime(self.filename_template)
return f.replace(" ", "_")
def get_active_archive(self):
if not self.active_archive:
filename = self._make_filename()
self.active_archive = self.archive_class(filename)
self.roll_checker.start(self.active_archive)
return self.active_archive
def _should_roll_archive(self):
return self.roll_checker.check(self.active_archive)
def _roll_archive(self):
pass
class ReadingRollManager(RollManager):
def __init__(self, filename_template, roll_checker, directory="."):
super(ReadingRollManager, self).__init__(filename_template,
roll_checker, directory)
self.archive_class = archive.ArchiveReader
def read_block(self):
pass
def read_header(self):
pass
def read_payload(self):
pass
class WritingRollManager(RollManager):
def __init__(self, filename_template, roll_checker, directory="."):
super(WritingRollManager, self).__init__(filename_template,
roll_checker, directory)
self.archive_class = archive.ArchiveWriter
def write(self, payload):
a = self.get_active_archive()
a.write(payload)
if self._should_roll_archive():
self._roll_archive()

View File

@ -1,192 +0,0 @@
# Copyright (c) 2014 Dark Secret Software Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import datetime
import struct
"""Binary data archiving library.
Data is written in the following format:
[ENTRY] =
<START-OF-BLOCK> byte # 0x05
<BLOCK-TYPE> long # new versions get new block-types
<CRC> ulong
<SIZE> long
[PAYLOAD] =
<START-OF-PAYLOAD> byte # 0x06
<RECORD_COUNT> int
[RECORD, RECORD, ...]
[RECORD] =
<SIZE> long
<KEY> varstr (%s)
<TYPE> int
<UNION>
<BOOLEAN>
<INT>
<FLOAT>
<DATETIME>
<STRING>
There are ArchiveReaders and ArchiveWriters which are managed
by RollManager. The RollManager opens and closes Archivers as
needed. "As needed" is determined by which RollChecker that was
passed into the RollManager. Archive files can roll over based
on file size or elapsed time (for writing). For reading, archive
files are only rolled over when the EOF is reached.
Roll Managers also take care of filename creation, compression
of completed archives and transfer of archive files to remote
storage locations.
TODO: How will the ReadingRollManager know which files to read
from, and in which order, if the filename is templated?
"""
def now():
"""Broken out for testing."""
return datetime.datetime.utcnow()
class RollChecker(object):
def start(self, archive):
"""Called when a new archive is selected."""
pass
def check(self, archive):
"""Should the current archive roll?"""
pass
class NeverRollChecker(RollChecker):
def check(self, archive):
return False
class TimeRollChecker(RollChecker):
def __init__(self, timedelta):
self.timedelta = timedelta
def start(self, archive):
self.start_time = now()
self.end_time = self.start_time + self.timedelta
def check(self, archive):
return now() >= self.end_time
class SizeRollChecker(RollChecker):
def __init__(self, size_in_gb):
self.size_in_gb = size_in_gb
def check(self, archive):
size = archive._get_file_handle().tell()
return size / 1073741824 >= self.size_in_gb
class RollManager(object):
def __init__(self, filename_template, roll_checker, directory="."):
self.filename_template = filename_template
self.roll_checker = roll_checker
self.directory = directory
self.active_archive = None
def _make_filename(self):
f = now().strftime(self.filename_template)
return f.replace(" ", "_")
def get_active_archive(self):
if not self.active_archive:
filename = self._make_filename()
self.active_archive = self.archive_class(filename)
self.roll_checker.start(self.active_archive)
return self.active_archive
def _should_roll_archive(self):
return self.roll_checker.check(self.active_archive)
def _roll_archive(self):
pass
class ReadingRollManager(RollManager):
def __init__(self, filename_template, roll_checker, directory="."):
super(ReadingRollManager, self).__init__(filename_template,
roll_checker, directory)
self.archive_class = ArchiveReader
def read_block(self):
pass
def read_header(self):
pass
def read_payload(self):
pass
class WritingRollManager(RollManager):
def __init__(self, filename_template, roll_checker, directory="."):
super(WritingRollManager, self).__init__(filename_template,
roll_checker, directory)
self.archive_class = ArchiveWriter
def write(self, payload):
a = self.get_active_archive()
a.write(payload)
if self._should_roll_archive():
self._roll_archive()
class Archive(object):
def __init__(self, filename):
self._handle = None
self.filename = filename
def get_file_handle(self):
return self._handle
class ArchiveWriter(Archive):
"""The active Archive for appending.
"""
def __init__(self, filename):
super(ArchiveWriter, self).__init__(filename)
self._handle = open(filename, "wb+")
def write(self, payload):
pass
class ArchiveReader(Archive):
"""The active Archive for consuming.
"""
def __init__(self, filename):
super(ArchiveReader, self).__init__(filename)
def read_block(self):
pass
def read_header(self):
pass
def read_payload(self):
pass

21
shoebox/utils.py Normal file
View File

@ -0,0 +1,21 @@
# Copyright (c) 2014 Dark Secret Software Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import datetime
def now():
"""Broken out for testing."""
return datetime.datetime.utcnow()

View File

@ -1,95 +0,0 @@
import datetime
import mock
import unittest
from shoebox import shoebox
class TestRollChecker(unittest.TestCase):
def test_time_roll_checker_start(self):
one_hour = datetime.timedelta(hours=1)
x = shoebox.TimeRollChecker(one_hour)
now = datetime.datetime.utcnow()
with mock.patch.object(shoebox, 'now') as dt:
dt.return_value = now
x.start(None)
self.assertEqual(x.start_time, now)
self.assertEqual(x.end_time, now + one_hour)
def test_time_roll_checker_end(self):
one_hour = datetime.timedelta(hours=1)
x = shoebox.TimeRollChecker(one_hour)
now = datetime.datetime.utcnow()
x.start_time = now
x.end_time = now + one_hour
with mock.patch.object(shoebox, 'now') as dt:
dt.return_value = now + one_hour
self.assertTrue(x.check(None))
with mock.patch.object(shoebox, 'now') as dt:
dt.return_value = now
self.assertFalse(x.check(None))
with mock.patch.object(shoebox, 'now') as dt:
dt.return_value = now + one_hour - datetime.timedelta(seconds = 1)
self.assertFalse(x.check(None))
def test_size_roll_checker_end(self):
one_gig = 1073741824
x = shoebox.SizeRollChecker(10)
archive = mock.Mock()
archive._get_file_handle.return_value.tell.return_value = one_gig * 5
self.assertFalse(x.check(archive))
archive._get_file_handle.return_value.tell.return_value = one_gig * 10
self.assertTrue(x.check(archive))
archive._get_file_handle.return_value.tell.return_value = one_gig * 11
self.assertTrue(x.check(archive))
class TestRollManager(unittest.TestCase):
def test_make_filename(self):
filename_template = "filename_%c"
now = datetime.datetime(day=1, month=2, year=2014,
hour=10, minute=11, second=12)
x = shoebox.RollManager("filename_%c.dat", None)
with mock.patch.object(shoebox, "now") as dt:
dt.return_value = now
filename = x._make_filename()
self.assertEqual(filename, "filename_Sat_Feb__1_10:11:12_2014.dat")
class TestWritingRollManager(unittest.TestCase):
def test_get_active_archive(self):
roll_checker = mock.Mock()
filename_template = "filename_%c.dat"
x = shoebox.WritingRollManager(filename_template, roll_checker)
archive = x.get_active_archive()
self.assertTrue(isinstance(archive, shoebox.ArchiveWriter))
self.assertTrue(roll_checker.start.called)
def test_write_always_roll(self):
roll_checker = mock.Mock()
roll_checker.check.return_value = True
x = shoebox.WritingRollManager("template", roll_checker)
with mock.patch.object(x, "_roll_archive") as ra:
x.write("payload")
self.assertTrue(ra.called)
def test_write_never_roll(self):
roll_checker = mock.Mock()
roll_checker.check.return_value = False
x = shoebox.WritingRollManager("template", roll_checker)
with mock.patch.object(x, "_roll_archive") as ra:
x.write("payload")
self.assertFalse(ra.called)
class TestWriting(unittest.TestCase):
def test_write(self):
roll_checker = shoebox.NeverRollChecker()
x = shoebox.WritingRollManager("template_%s", roll_checker)
for index in range(10):
x.write("payload_%d" % index)