more fixes and moving to txaio logging

This commit is contained in:
meejah 2015-09-15 14:57:41 -06:00
parent 3a0bdae239
commit e09df6d80a
6 changed files with 69 additions and 75 deletions

View File

@ -28,7 +28,6 @@ from __future__ import absolute_import
import binascii
from twisted.python import log
from twisted.internet.protocol import Factory
from twisted.protocols.basic import Int32StringReceiver
from twisted.internet.error import ConnectionDone
@ -36,6 +35,8 @@ from twisted.internet.error import ConnectionDone
from autobahn.twisted.util import peer2str
from autobahn.wamp.exception import ProtocolError, SerializationError, TransportLost
import txaio
__all__ = (
'WampRawSocketServerProtocol',
'WampRawSocketClientProtocol',
@ -48,10 +49,11 @@ class WampRawSocketProtocol(Int32StringReceiver):
"""
Base class for Twisted-based WAMP-over-RawSocket protocols.
"""
log = txaio.make_logger()
def connectionMade(self):
if self.factory.debug:
log.msg("WampRawSocketProtocol: connection made")
self.log.debug("WampRawSocketProtocol: connection made")
# the peer we are connected to
#
@ -92,42 +94,42 @@ class WampRawSocketProtocol(Int32StringReceiver):
except Exception as e:
# Exceptions raised in onOpen are fatal ..
if self.factory.debug:
log.msg("WampRawSocketProtocol: ApplicationSession constructor / onOpen raised ({0})".format(e))
self.log.info("WampRawSocketProtocol: ApplicationSession constructor / onOpen raised ({0})".format(e))
self.abort()
else:
if self.factory.debug:
log.msg("ApplicationSession started.")
self.log.info("ApplicationSession started.")
def connectionLost(self, reason):
if self.factory.debug:
log.msg("WampRawSocketProtocol: connection lost: reason = '{0}'".format(reason))
self.log.info("WampRawSocketProtocol: connection lost: reason = '{0}'".format(reason))
try:
wasClean = isinstance(reason.value, ConnectionDone)
self._session.onClose(wasClean)
except Exception as e:
# silently ignore exceptions raised here ..
if self.factory.debug:
log.msg("WampRawSocketProtocol: ApplicationSession.onClose raised ({0})".format(e))
self.log.info("WampRawSocketProtocol: ApplicationSession.onClose raised ({0})".format(e))
self._session = None
def stringReceived(self, payload):
if self.factory.debug:
log.msg("WampRawSocketProtocol: RX octets: {0}".format(binascii.hexlify(payload)))
self.log.info("WampRawSocketProtocol: RX octets: {0}".format(binascii.hexlify(payload)))
try:
for msg in self._serializer.unserialize(payload):
if self.factory.debug:
log.msg("WampRawSocketProtocol: RX WAMP message: {0}".format(msg))
self.log.info("WampRawSocketProtocol: RX WAMP message: {0}".format(msg))
self._session.onMessage(msg)
except ProtocolError as e:
log.msg(str(e))
self.log.info(str(e))
if self.factory.debug:
log.msg("WampRawSocketProtocol: WAMP Protocol Error ({0}) - aborting connection".format(e))
self.log.info("WampRawSocketProtocol: WAMP Protocol Error ({0}) - aborting connection".format(e))
self.abort()
except Exception as e:
if self.factory.debug:
log.msg("WampRawSocketProtocol: WAMP Internal Error ({0}) - aborting connection".format(e))
self.log.info("WampRawSocketProtocol: WAMP Internal Error ({0}) - aborting connection".format(e))
self.abort()
def send(self, msg):
@ -136,7 +138,7 @@ class WampRawSocketProtocol(Int32StringReceiver):
"""
if self.isOpen():
if self.factory.debug:
log.msg("WampRawSocketProtocol: TX WAMP message: {0}".format(msg))
self.log.info("WampRawSocketProtocol: TX WAMP message: {0}".format(msg))
try:
payload, _ = self._serializer.serialize(msg)
except Exception as e:
@ -145,7 +147,7 @@ class WampRawSocketProtocol(Int32StringReceiver):
else:
self.sendString(payload)
if self.factory.debug:
log.msg("WampRawSocketProtocol: TX octets: {0}".format(binascii.hexlify(payload)))
self.log.info("WampRawSocketProtocol: TX octets: {0}".format(binascii.hexlify(payload)))
else:
raise TransportLost()
@ -194,18 +196,18 @@ class WampRawSocketServerProtocol(WampRawSocketProtocol):
if len(self._handshake_bytes) == 4:
if self.factory.debug:
log.msg("WampRawSocketProtocol: opening handshake received - {0}".format(binascii.b2a_hex(self._handshake_bytes)))
self.log.info("WampRawSocketProtocol: opening handshake received - {0}".format(binascii.b2a_hex(self._handshake_bytes)))
if ord(self._handshake_bytes[0]) != 0x7f:
if self.factory.debug:
log.msg("WampRawSocketProtocol: invalid magic byte (octet 1) in opening handshake: was 0x{0}, but expected 0x7f".format(binascii.b2a_hex(self._handshake_bytes[0])))
self.log.info("WampRawSocketProtocol: invalid magic byte (octet 1) in opening handshake: was 0x{0}, but expected 0x7f".format(binascii.b2a_hex(self._handshake_bytes[0])))
self.abort()
# peer requests us to send messages of maximum length 2**max_len_exp
#
self._max_len_send = 2 ** (9 + (ord(self._handshake_bytes[1]) >> 4))
if self.factory.debug:
log.msg("WampRawSocketProtocol: client requests us to send out most {} bytes per message".format(self._max_len_send))
self.log.info("WampRawSocketProtocol: client requests us to send out most {} bytes per message".format(self._max_len_send))
# client wants to speak this serialization format
#
@ -213,10 +215,10 @@ class WampRawSocketServerProtocol(WampRawSocketProtocol):
if ser_id in self.factory._serializers:
self._serializer = self.factory._serializers[ser_id]
if self.factory.debug:
log.msg("WampRawSocketProtocol: client wants to use serializer {}".format(ser_id))
self.log.info("WampRawSocketProtocol: client wants to use serializer {}".format(ser_id))
else:
if self.factory.debug:
log.msg("WampRawSocketProtocol: opening handshake - no suitable serializer found (client requested {0}, and we have {1})".format(ser_id, self.factory._serializers.keys()))
self.log.info("WampRawSocketProtocol: opening handshake - no suitable serializer found (client requested {0}, and we have {1})".format(ser_id, self.factory._serializers.keys()))
self.abort()
# we request the peer to send message of maximum length 2**reply_max_len_exp
@ -235,7 +237,7 @@ class WampRawSocketServerProtocol(WampRawSocketProtocol):
self._on_handshake_complete()
if self.factory.debug:
log.msg("WampRawSocketProtocol: opening handshake completed", self._serializer)
self.log.info("WampRawSocketProtocol: opening handshake completed", self._serializer)
# consume any remaining data received already ..
#
@ -275,25 +277,25 @@ class WampRawSocketClientProtocol(WampRawSocketProtocol):
if len(self._handshake_bytes) == 4:
if self.factory.debug:
log.msg("WampRawSocketProtocol: opening handshake received - {0}".format(binascii.b2a_hex(self._handshake_bytes)))
self.log.info("WampRawSocketProtocol: opening handshake received - {0}".format(binascii.b2a_hex(self._handshake_bytes)))
if ord(self._handshake_bytes[0]) != 0x7f:
if self.factory.debug:
log.msg("WampRawSocketProtocol: invalid magic byte (octet 1) in opening handshake: was 0x{0}, but expected 0x7f".format(binascii.b2a_hex(self._handshake_bytes[0])))
self.log.info("WampRawSocketProtocol: invalid magic byte (octet 1) in opening handshake: was 0x{0}, but expected 0x7f".format(binascii.b2a_hex(self._handshake_bytes[0])))
self.abort()
# peer requests us to send messages of maximum length 2**max_len_exp
#
self._max_len_send = 2 ** (9 + (ord(self._handshake_bytes[1]) >> 4))
if self.factory.debug:
log.msg("WampRawSocketProtocol: server requests us to send out most {} bytes per message".format(self._max_len_send))
self.log.info("WampRawSocketProtocol: server requests us to send out most {} bytes per message".format(self._max_len_send))
# client wants to speak this serialization format
#
ser_id = ord(self._handshake_bytes[1]) & 0x0F
if ser_id != self._serializer.RAWSOCKET_SERIALIZER_ID:
if self.factory.debug:
log.msg("WampRawSocketProtocol: opening handshake - no suitable serializer found (server replied {0}, and we requested {1})".format(ser_id, self._serializer.RAWSOCKET_SERIALIZER_ID))
self.log.info("WampRawSocketProtocol: opening handshake - no suitable serializer found (server replied {0}, and we requested {1})".format(ser_id, self._serializer.RAWSOCKET_SERIALIZER_ID))
self.abort()
self._handshake_complete = True
@ -301,7 +303,7 @@ class WampRawSocketClientProtocol(WampRawSocketProtocol):
self._on_handshake_complete()
if self.factory.debug:
log.msg("WampRawSocketProtocol: opening handshake completed", self._serializer)
self.log.info("WampRawSocketProtocol: opening handshake completed", self._serializer)
# consume any remaining data received already ..
#

View File

@ -30,7 +30,6 @@ import inspect
import six
from twisted.python import log
from twisted.internet.defer import inlineCallbacks
from autobahn.wamp import protocol
@ -67,11 +66,9 @@ class ApplicationSession(protocol.ApplicationSession):
"""
Override of wamp.ApplicationSession
"""
# see docs; will print currently-active exception to the logs,
# which is just what we want.
log.err(e)
self.log.error(txaio.failure_format_traceback(txaio.create_future_error(e)))
# also log the framework-provided error-message
log.err(msg)
self.log.error(msg)
class ApplicationSessionFactory(protocol.ApplicationSessionFactory):
@ -168,7 +165,10 @@ class ApplicationRunner(object):
isSecure, host, port, resource, path, params = parseWsUrl(self.url)
txaio.start_logging(level='debug')
if self.debug or self.debug_wamp or self.debug_app:
txaio.start_logging(level='debug')
else:
txaio.start_logging(level='info')
# factory for use ApplicationSession
def create():
@ -178,7 +178,7 @@ class ApplicationRunner(object):
except Exception as e:
if start_reactor:
# the app component could not be created .. fatal
log.err(str(e))
self.log.error(str(e))
reactor.stop()
else:
# if we didn't start the reactor, it's up to the
@ -526,7 +526,7 @@ class Application(object):
yield handler(*args, **kwargs)
except Exception as e:
# FIXME
log.msg("Warning: exception in signal handler swallowed", e)
self.log.info("Warning: exception in signal handler swallowed", e)
if service:

View File

@ -520,8 +520,8 @@ class ApplicationSession(BaseSession):
:param msg: an informative message from the library. It is
suggested you log this immediately after the exception.
"""
traceback.print_exc()
print(msg)
self.log.error(txaio.failure_format_traceback(txaio.create_future_error(e)))
self.log.error(msg)
def _swallow_error(self, fail, msg):
'''
@ -535,7 +535,6 @@ class ApplicationSession(BaseSession):
chain for a Deferred/coroutine that will make it out to user
code.
'''
# print("_swallow_error", typ, exc, tb)
try:
self.onUserError(fail.value, msg)
except:
@ -546,7 +545,6 @@ class ApplicationSession(BaseSession):
"""
Implements :func:`autobahn.wamp.interfaces.ITransportHandler.onMessage`
"""
print("BLAM", id(self.log), type(self.log))
self.log.debug("onMessage: {message}", session_id=self._session_id, message=msg)
self.log.trace("onMessage: {message}", session_id=self._session_id, message=msg)
if self._session_id is None:
@ -814,11 +812,7 @@ class ApplicationSession(BaseSession):
pass
formatted_tb = None
if self.traceback_app:
# if asked to marshal the traceback within the WAMP error message, extract it
# noinspection PyCallingNonCallable
tb = StringIO()
err.printTraceback(file=tb)
formatted_tb = tb.getvalue().splitlines()
formatted_tb = txaio.failure_format_traceback(err)
del self._invocations[msg.request]
@ -972,7 +966,7 @@ class ApplicationSession(BaseSession):
Implements :func:`autobahn.wamp.interfaces.ISession.onLeave`
"""
if details.reason.startswith('wamp.error.'):
print('{error}: {message}'.format(error=details.reason, message=details.message))
self.log.error('{reason}: {message}', reason=details.reason, message=details.message)
if self._transport:
self.disconnect()
# do we ever call onLeave with a valid transport?

View File

@ -32,7 +32,6 @@ if os.environ.get('USE_TWISTED', False):
from twisted.internet.defer import inlineCallbacks, Deferred, returnValue
from twisted.internet.defer import succeed, DeferredList
from twisted.python import log
from twisted.trial import unittest
from six import PY3
@ -463,37 +462,28 @@ if os.environ.get('USE_TWISTED', False):
error_instance = RuntimeError("we have a problem")
got_err_d = Deferred()
def observer(kw):
if kw['isError'] and 'failure' in kw:
fail = kw['failure']
fail.trap(RuntimeError)
if error_instance == fail.value:
got_err_d.callback(True)
log.addObserver(observer)
def observer(e, msg):
if error_instance == e:
got_err_d.callback(True)
handler.onUserError = observer
def boom():
raise error_instance
try:
sub = yield handler.subscribe(boom, u'com.myapp.topic1')
sub = yield handler.subscribe(boom, u'com.myapp.topic1')
# MockTransport gives us the ack reply and then we do our
# own event message
publish = yield handler.publish(
u'com.myapp.topic1',
options=types.PublishOptions(acknowledge=True, exclude_me=False),
)
msg = message.Event(sub.id, publish.id)
handler.onMessage(msg)
# MockTransport gives us the ack reply and then we do our
# own event message
publish = yield handler.publish(
u'com.myapp.topic1',
options=types.PublishOptions(acknowledge=True, exclude_me=False),
)
msg = message.Event(sub.id, publish.id)
handler.onMessage(msg)
# we know it worked if our observer worked and did
# .callback on our Deferred above.
self.assertTrue(got_err_d.called)
# ...otherwise trial will fail the test anyway
self.flushLoggedErrors()
finally:
log.removeObserver(observer)
# we know it worked if our observer worked and did
# .callback on our Deferred above.
self.assertTrue(got_err_d.called)
@inlineCallbacks
def test_unsubscribe(self):
@ -598,6 +588,10 @@ if os.environ.get('USE_TWISTED', False):
handler = ApplicationSession()
handler.traceback_app = True
MockTransport(handler)
errors = []
def log_error(e, msg):
errors.append((e, msg))
handler.onUserError = log_error
name_error = NameError('foo')
@ -618,9 +612,8 @@ if os.environ.get('USE_TWISTED', False):
# also, we should have logged the real NameError to
# Twisted.
errs = self.flushLoggedErrors()
self.assertEqual(1, len(errs))
self.assertEqual(name_error, errs[0].value)
self.assertEqual(1, len(errors))
self.assertEqual(name_error, errors[0][0].value)
@inlineCallbacks
def test_invoke_progressive_result(self):
@ -674,6 +667,10 @@ if os.environ.get('USE_TWISTED', False):
got_progress = Deferred()
progress_error = NameError('foo')
logged_errors = []
def got_error(e, msg):
logged_errors.append((e, msg))
handler.onUserError = got_error
def progress(arg, something=None):
self.assertEqual('nothing', something)
@ -693,15 +690,15 @@ if os.environ.get('USE_TWISTED', False):
options=types.CallOptions(on_progress=progress),
key='word',
)
self.assertEqual(42, res)
# our progress handler raised an error, but not before
# recording success.
self.assertTrue(got_progress.called)
self.assertEqual('life', got_progress.result)
# make sure our progress-handler error was logged
errs = self.flushLoggedErrors()
self.assertEqual(1, len(errs))
self.assertEqual(progress_error, errs[0].value)
self.assertEqual(1, len(logged_errors))
self.assertEqual(progress_error, logged_errors[0][0])
@inlineCallbacks
def test_invoke_progressive_result_no_args(self):

View File

@ -28,6 +28,7 @@ from __future__ import absolute_import, print_function
import os
import unittest2 as unittest
import txaio
if os.environ.get('USE_TWISTED', False):
from mock import patch

View File

@ -18,7 +18,7 @@ deps =
unittest2
coverage
msgpack-python
../txaio
git+https://github.com/meejah/txaio@issue8-squash
; twisted dependencies
twtrunk: https://github.com/twisted/twisted/archive/trunk.zip