Add basic lazy gettext implementation

This is the first part of an implementation towards
a deferred localization functionality in OpenStack.
This change adds a Message class for encapsulating a
translatable message and its relevant data, as well
as an example LogHandler of how such a class can be
used in the logging context.

bp delayed-message-translation

Change-Id: I8485a346d32925327ea9185e0da3822e4e19c2f5
This commit is contained in:
Matt Odden 2013-04-15 13:17:11 +00:00
parent 581709cb57
commit 83d5fe53be
2 changed files with 547 additions and 1 deletions

View File

@ -2,6 +2,7 @@
# Copyright 2012 Red Hat, Inc.
# All Rights Reserved.
# Copyright 2013 IBM Corp.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
@ -23,8 +24,11 @@ Usual usage in an openstack.common module:
from openstack.common.gettextutils import _
"""
import copy
import gettext
import logging.handlers
import os
import UserString
_localedir = os.environ.get('oslo'.upper() + '_LOCALEDIR')
_t = gettext.translation('oslo', localedir=_localedir, fallback=True)
@ -48,3 +52,175 @@ def install(domain):
gettext.install(domain,
localedir=os.environ.get(domain.upper() + '_LOCALEDIR'),
unicode=True)
"""
Lazy gettext functionality.
The following is an attempt to introduce a deferred way
to do translations on messages in OpenStack. We attempt to
override the standard _() function and % (format string) operation
to build Message objects that can later be translated when we have
more information. Also included is an example LogHandler that
translates Messages to an associated locale, effectively allowing
many logs, each with their own locale.
"""
def get_lazy_gettext(domain):
"""Assemble and return a lazy gettext function for a given domain.
Factory method for a project/module to get a lazy gettext function
for its own translation domain (i.e. nova, glance, cinder, etc.)
"""
def _lazy_gettext(msg):
"""
Create and return a Message object encapsulating a string
so that we can translate it later when needed.
"""
return Message(msg, domain)
return _lazy_gettext
class Message(UserString.UserString, object):
"""Class used to encapsulate translatable messages."""
def __init__(self, msg, domain):
# _msg is the gettext msgid and should never change
self._msg = msg
self._left_extra_msg = ''
self._right_extra_msg = ''
self.params = None
self.locale = None
self.domain = domain
@property
def data(self):
# NOTE(mrodden): this should always resolve to a unicode string
# that best represents the state of the message currently
localedir = os.environ.get(self.domain.upper() + '_LOCALEDIR')
if self.locale:
lang = gettext.translation(self.domain,
localedir=localedir,
languages=[self.locale],
fallback=True)
else:
# use system locale for translations
lang = gettext.translation(self.domain,
localedir=localedir,
fallback=True)
full_msg = (self._left_extra_msg +
lang.ugettext(self._msg) +
self._right_extra_msg)
if self.params is not None:
full_msg = full_msg % self.params
return unicode(full_msg)
def _save_parameters(self, other):
# we check for None later to see if
# we actually have parameters to inject,
# so encapsulate if our parameter is actually None
if other is None:
self.params = (other, )
else:
self.params = copy.deepcopy(other)
return self
# overrides to be more string-like
def __unicode__(self):
return self.data
def __str__(self):
return self.data.encode('utf-8')
def __getstate__(self):
to_copy = ['_msg', '_right_extra_msg', '_left_extra_msg',
'domain', 'params', 'locale']
new_dict = self.__dict__.fromkeys(to_copy)
for attr in to_copy:
new_dict[attr] = copy.deepcopy(self.__dict__[attr])
return new_dict
def __setstate__(self, state):
for (k, v) in state.items():
setattr(self, k, v)
# operator overloads
def __add__(self, other):
copied = copy.deepcopy(self)
copied._right_extra_msg += other.__str__()
return copied
def __radd__(self, other):
copied = copy.deepcopy(self)
copied._left_extra_msg += other.__str__()
return copied
def __mod__(self, other):
# do a format string to catch and raise
# any possible KeyErrors from missing parameters
self.data % other
copied = copy.deepcopy(self)
return copied._save_parameters(other)
def __mul__(self, other):
return self.data * other
def __rmul__(self, other):
return other * self.data
def __getitem__(self, key):
return self.data[key]
def __getslice__(self, start, end):
return self.data.__getslice__(start, end)
def __getattribute__(self, name):
# NOTE(mrodden): handle lossy operations that we can't deal with yet
# These override the UserString implementation, since UserString
# uses our __class__ attribute to try and build a new message
# after running the inner data string through the operation.
# At that point, we have lost the gettext message id and can just
# safely resolve to a string instead.
ops = ['capitalize', 'center', 'decode', 'encode',
'expandtabs', 'ljust', 'lstrip', 'replace', 'rjust', 'rstrip',
'strip', 'swapcase', 'title', 'translate', 'upper', 'zfill']
if name in ops:
return getattr(self.data, name)
else:
return UserString.UserString.__getattribute__(self, name)
class LocaleHandler(logging.Handler):
"""Handler that can have a locale associated to translate Messages.
A quick example of how to utilize the Message class above.
LocaleHandler takes a locale and a target logging.Handler object
to forward LogRecord objects to after translating the internal Message.
"""
def __init__(self, locale, target):
"""
Initialize a LocaleHandler
:param locale: locale to use for translating messages
:param target: logging.Handler object to forward
LogRecord objects to after translation
"""
logging.Handler.__init__(self)
self.locale = locale
self.target = target
def emit(self, record):
if isinstance(record.msg, Message):
# set the locale and resolve to a string
record.msg.locale = self.locale
self.target.emit(record)

View File

@ -2,6 +2,7 @@
# Copyright 2012 Red Hat, Inc.
# All Rights Reserved.
# Copyright 2013 IBM Corp.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
@ -15,7 +16,10 @@
# License for the specific language governing permissions and limitations
# under the License.
import logging
import copy
import gettext
import logging.handlers
import os
import mock
@ -42,3 +46,369 @@ class GettextTest(utils.BaseTestCase):
gettext_install.assert_called_once_with('blaa',
localedir='/foo/bar',
unicode=True)
class MessageTestCase(utils.BaseTestCase):
"""Unit tests for locale Message class."""
def setUp(self):
super(MessageTestCase, self).setUp()
self._lazy_gettext = gettextutils.get_lazy_gettext('oslo')
def tearDown(self):
# need to clean up stubs early since they interfere
# with super class clean up operations
self.mox.UnsetStubs()
super(MessageTestCase, self).tearDown()
def test_message_equal_to_string(self):
msgid = "Some msgid string"
result = self._lazy_gettext(msgid)
self.assertEqual(result, msgid)
def test_message_not_equal(self):
msgid = "Some msgid string"
result = self._lazy_gettext(msgid)
self.assertNotEqual(result, "Other string %s" % msgid)
def test_message_equal_with_param(self):
msgid = "Some string with params: %s"
params = (0, )
message = msgid % params
result = self._lazy_gettext(msgid) % params
self.assertEqual(result, message)
result_str = '%s' % result
self.assertEqual(result_str, message)
def test_message_injects_nonetype(self):
msgid = "Some string with param: %s"
params = None
message = msgid % params
result = self._lazy_gettext(msgid) % params
self.assertEqual(result, message)
result_str = '%s' % result
self.assertIn('None', result_str)
self.assertEqual(result_str, message)
def test_message_iterate(self):
msgid = "Some string with params: %s"
params = 'blah'
message = msgid % params
result = self._lazy_gettext(msgid) % params
# compare using iterators
for (c1, c2) in zip(result, message):
self.assertEqual(c1, c2)
def test_message_equal_with_dec_param(self):
"""Verify we can inject numbers into Messages."""
msgid = "Some string with params: %d"
params = [0, 1, 10, 24124]
messages = []
results = []
for param in params:
messages.append(msgid % param)
results.append(self._lazy_gettext(msgid) % param)
for message, result in zip(messages, results):
self.assertEqual(type(result), gettextutils.Message)
self.assertEqual(result, message)
# simulate writing out as string
result_str = '%s' % result
self.assertEqual(result_str, message)
def test_message_equal_with_extra_params(self):
msgid = "Some string with params: %(param1)s %(param2)s"
params = {'param1': 'test',
'param2': 'test2',
'param3': 'notinstring'}
result = self._lazy_gettext(msgid) % params
self.assertEqual(result, msgid % params)
def test_message_object_param_copied(self):
"""Verify that injected parameters get copied."""
some_obj = SomeObject()
some_obj.tag = 'stub_object'
msgid = "Found object: %(some_obj)s"
result = self._lazy_gettext(msgid) % {'some_obj': some_obj}
old_some_obj = copy.copy(some_obj)
some_obj.tag = 'switched_tag'
self.assertEqual(result, msgid % {'some_obj': old_some_obj})
def test_interpolation_with_missing_param(self):
msgid = ("Some string with params: %(param1)s %(param2)s"
" and a missing one %(missing)s")
params = {'param1': 'test',
'param2': 'test2'}
test_me = lambda: self._lazy_gettext(msgid) % params
self.assertRaises(KeyError, test_me)
def test_operator_add(self):
msgid = "Some msgid string"
result = self._lazy_gettext(msgid)
additional = " with more added"
expected = msgid + additional
result = result + additional
self.assertEqual(type(result), gettextutils.Message)
self.assertEqual(result, expected)
def test_operator_radd(self):
msgid = "Some msgid string"
result = self._lazy_gettext(msgid)
additional = " with more added"
expected = additional + msgid
result = additional + result
self.assertEqual(type(result), gettextutils.Message)
self.assertEqual(result, expected)
def test_get_index(self):
msgid = "Some msgid string"
result = self._lazy_gettext(msgid)
expected = 'm'
result = result[2]
self.assertEqual(result, expected)
def test_getitem_string(self):
"""Verify using string indexes on Message does not work."""
msgid = "Some msgid string"
result = self._lazy_gettext(msgid)
test_me = lambda: result['blah']
self.assertRaises(TypeError, test_me)
def test_contains(self):
msgid = "Some msgid string"
result = self._lazy_gettext(msgid)
self.assertIn('msgid', result)
self.assertNotIn('blah', result)
def test_locale_set_does_translation(self):
msgid = "Some msgid string"
result = self._lazy_gettext(msgid)
result.domain = 'test_domain'
result.locale = 'test_locale'
os.environ['TEST_DOMAIN_LOCALEDIR'] = '/tmp/blah'
self.mox.StubOutWithMock(gettext, 'translation')
fake_lang = self.mox.CreateMock(gettext.GNUTranslations)
gettext.translation('test_domain',
languages=['test_locale'],
fallback=True,
localedir='/tmp/blah').AndReturn(fake_lang)
fake_lang.ugettext(msgid).AndReturn(msgid)
self.mox.ReplayAll()
result = result.data
os.environ.pop('TEST_DOMAIN_LOCALEDIR')
self.assertEqual(msgid, result)
def _get_testmsg_inner_params(self):
return {'params': {'test1': 'blah1',
'test2': 'blah2',
'test3': SomeObject()},
'domain': 'test_domain',
'locale': 'en_US',
'_left_extra_msg': 'Extra. ',
'_right_extra_msg': '. More Extra.'}
def _get_full_test_message(self):
msgid = "Some msgid string: %(test1)s %(test2)s %(test3)s"
message = self._lazy_gettext(msgid)
attrs = self._get_testmsg_inner_params()
for (k, v) in attrs.items():
setattr(message, k, v)
return copy.deepcopy(message)
def test_message_copyable(self):
message = self._get_full_test_message()
copied_msg = copy.copy(message)
self.assertIsNot(message, copied_msg)
for k in self._get_testmsg_inner_params():
self.assertEqual(getattr(message, k),
getattr(copied_msg, k))
self.assertEqual(message, copied_msg)
message._msg = 'Some other msgid string'
self.assertNotEqual(message, copied_msg)
def test_message_copy_deepcopied(self):
message = self._get_full_test_message()
inner_obj = SomeObject()
message.params['test3'] = inner_obj
copied_msg = copy.copy(message)
self.assertIsNot(message, copied_msg)
inner_obj.tag = 'different'
self.assertNotEqual(message, copied_msg)
def test_add_returns_copy(self):
msgid = "Some msgid string: %(test1)s %(test2)s"
message = self._lazy_gettext(msgid)
m1 = '10 ' + message + ' 10'
m2 = '20 ' + message + ' 20'
self.assertIsNot(message, m1)
self.assertIsNot(message, m2)
self.assertIsNot(m1, m2)
self.assertEqual(m1, '10 %s 10' % msgid)
self.assertEqual(m2, '20 %s 20' % msgid)
def test_mod_returns_copy(self):
msgid = "Some msgid string: %(test1)s %(test2)s"
message = self._lazy_gettext(msgid)
m1 = message % {'test1': 'foo', 'test2': 'bar'}
m2 = message % {'test1': 'foo2', 'test2': 'bar2'}
self.assertIsNot(message, m1)
self.assertIsNot(message, m2)
self.assertIsNot(m1, m2)
self.assertEqual(m1, msgid % {'test1': 'foo', 'test2': 'bar'})
self.assertEqual(m2, msgid % {'test1': 'foo2', 'test2': 'bar2'})
def test_comparator_operators(self):
"""Verify Message comparison is equivalent to string comparision."""
m1 = self._get_full_test_message()
m2 = copy.deepcopy(m1)
m3 = "1" + m1
# m1 and m2 are equal
self.assertEqual(m1 >= m2, str(m1) >= str(m2))
self.assertEqual(m1 <= m2, str(m1) <= str(m2))
self.assertEqual(m2 >= m1, str(m2) >= str(m1))
self.assertEqual(m2 <= m1, str(m2) <= str(m1))
# m1 is greater than m3
self.assertEqual(m1 >= m3, str(m1) >= str(m3))
self.assertEqual(m1 > m3, str(m1) > str(m3))
# m3 is not greater than m1
self.assertEqual(m3 >= m1, str(m3) >= str(m1))
self.assertEqual(m3 > m1, str(m3) > str(m1))
# m3 is less than m1
self.assertEqual(m3 <= m1, str(m3) <= str(m1))
self.assertEqual(m3 < m1, str(m3) < str(m1))
# m3 is not less than m1
self.assertEqual(m1 <= m3, str(m1) <= str(m3))
self.assertEqual(m1 < m3, str(m1) < str(m3))
def test_mul_operator(self):
message = self._get_full_test_message()
message_str = str(message)
self.assertEqual(message * 10, message_str * 10)
self.assertEqual(message * 20, message_str * 20)
self.assertEqual(10 * message, 10 * message_str)
self.assertEqual(20 * message, 20 * message_str)
def test_to_unicode(self):
message = self._get_full_test_message()
message_str = unicode(message)
self.assertEqual(message, message_str)
self.assertTrue(isinstance(message_str, unicode))
class LocaleHandlerTestCase(utils.BaseTestCase):
def setUp(self):
super(LocaleHandlerTestCase, self).setUp()
self._lazy_gettext = gettextutils.get_lazy_gettext('oslo')
self.buffer_handler = logging.handlers.BufferingHandler(40)
self.locale_handler = gettextutils.LocaleHandler(
'zh_CN', self.buffer_handler)
self.logger = logging.getLogger('localehander_logger')
self.logger.propogate = False
self.logger.setLevel(logging.DEBUG)
self.logger.addHandler(self.locale_handler)
def test_emit_message(self):
msgid = 'Some logrecord message.'
message = self._lazy_gettext(msgid)
self.emit_called = False
def emit(record):
self.assertEqual(record.msg.locale, 'zh_CN')
self.assertEqual(record.msg, msgid)
self.assertTrue(isinstance(record.msg,
gettextutils.Message))
self.emit_called = True
self.stubs.Set(self.buffer_handler, 'emit', emit)
self.logger.info(message)
self.assertTrue(self.emit_called)
def test_emit_nonmessage(self):
msgid = 'Some logrecord message.'
self.emit_called = False
def emit(record):
self.assertEqual(record.msg, msgid)
self.assertFalse(isinstance(record.msg,
gettextutils.Message))
self.emit_called = True
self.stubs.Set(self.buffer_handler, 'emit', emit)
self.logger.info(msgid)
self.assertTrue(self.emit_called)
class SomeObject(object):
def __init__(self, tag='default'):
self.tag = tag
def __str__(self):
return self.tag
def __getstate__(self):
return self.__dict__
def __setstate__(self, state):
for (k, v) in state.items():
setattr(self, k, v)
def __eq__(self, other):
if isinstance(other, self.__class__):
return self.tag == other.tag
return False