Simple per-event file output.

Records the event as a json payload in a file that
may include a CRC of the payload. This is typically used
when storing events in something like HDFS.

Change-Id: Iaecca2397afa3501e437e27d698f3573760f1ac0
This commit is contained in:
Sandy Walsh 2015-02-16 14:34:22 -08:00
parent a3e80502f6
commit 852c1853e5
2 changed files with 66 additions and 0 deletions

View File

@ -14,6 +14,8 @@
# limitations under the License.
import fnmatch
import gzip
import hashlib
import os
import os.path
@ -31,6 +33,10 @@ class NoValidFile(Exception):
pass
class BadWorkingDirectory(Exception):
pass
class RollManager(object):
def __init__(self, filename_template, directory=".",
archive_class=None, archive_callback=None):
@ -131,3 +137,34 @@ class WritingRollManager(RollManager):
def _should_roll_archive(self):
return self.roll_checker.check(self.active_archive)
class WritingJSONRollManager(object):
"""No archiver. No roll checker. Just write the file locally.
Expects an external tool like rsync to move the file.
A SHA-256 of the payload may be included in the filename."""
def __init__(self, *args, **kwargs):
self.filename_template = args[0]
self.directory = kwargs.get('directory', '.')
if not os.path.isdir(self.directory):
raise BadWorkingDirectory("Directory '%s' does not exist" %
self.directory)
def _make_filename(self, crc):
now = notification_utils.now()
dt = str(notification_utils.dt_to_decimal(now))
f = now.strftime(self.filename_template)
f = f.replace(" ", "_")
f = f.replace("/", "_")
f = f.replace(":", "_")
f = f.replace("[[CRC]]", crc)
f = f.replace("[[TIMESTAMP]]", dt)
return os.path.join(self.directory, f)
def write(self, metadata, json_payload):
# Metadata is ignored.
crc = hashlib.sha256(json_payload).hexdigest()
filename = self._make_filename(crc)
f = gzip.open(filename, 'wb')
f.write(json_payload)
f.close()

View File

@ -92,3 +92,32 @@ class TestWriting(unittest.TestCase):
arc = x.get_active_archive()
self.assertEqual(10, len(arc.data))
self.assertEqual(({"index": "0"}, "payload_0"), arc.data[0])
class TestJSONRollManager(unittest.TestCase):
def test_bad_directory(self):
try:
roll_manager.WritingJSONRollManager("x", directory="bad_directory")
self.fail("Should raise BadWorkingDirectory")
except roll_manager.BadWorkingDirectory as e:
pass
def test_make_filename(self):
now = datetime.datetime(day=1, month=2, year=2014,
hour=10, minute=11, second=12)
with mock.patch.object(notification_utils, "now") as dt:
with mock.patch.object(notification_utils, "dt_to_decimal") as td:
td.return_value = 123.45
dt.return_value = now
x = roll_manager.WritingJSONRollManager(
"%Y%m%d [[TIMESTAMP]] [[CRC]].foo")
fn = x._make_filename("mycrc")
self.assertEqual("./20140201_123.45_mycrc.foo", fn)
def test_write(self):
x = roll_manager.WritingJSONRollManager("template.foo")
with mock.patch.object(roll_manager.gzip, "open") as gz:
x.write("metadata", "json_payload")
self.assertTrue(gz.called_once_with("template.foo", "wb"))