Newapi asyncio component (#872)

* start of asyncio side of component API

* copy-pasta the authentication stuff for now

* add 'trust_root' to TLS config, fix generic component errors

* change MyAuthorizer to match component examples

* new certificate; expired

* move auth code to central place, refactor general new-Session code

* fixes to asyncio, unix, some flake8

* refactor authentication API

add_authenticator now takes an instance of IAuthenticator
that we construct with a factory function

* unused imports

* no qa on invalid syntax for py 2
This commit is contained in:
Tobias Oberstein 2017-07-19 14:15:26 +02:00 committed by GitHub
parent 15500e2c21
commit 0af895bff0
13 changed files with 908 additions and 176 deletions

View File

@ -0,0 +1,460 @@
###############################################################################
#
# The MIT License (MIT)
#
# Copyright (c) Crossbar.io Technologies GmbH
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
# THE SOFTWARE.
#
###############################################################################
from __future__ import absolute_import, print_function
import six
import ssl # XXX what Python version is this always available at?
import signal
import itertools
from functools import partial
try:
import asyncio
except ImportError:
# Trollius >= 0.3 was renamed to asyncio
# noinspection PyUnresolvedReferences
import trollius as asyncio
import txaio
txaio.use_asyncio() # noqa
from autobahn.asyncio.websocket import WampWebSocketClientFactory
from autobahn.asyncio.rawsocket import WampRawSocketClientFactory
from autobahn.wamp import component
from autobahn.asyncio.wamp import Session
from autobahn.wamp.exception import ApplicationError
__all__ = ('Component',)
def _is_ssl_error(e):
"""
Internal helper.
"""
return isinstance(e, ssl.SSLError)
def _unique_list(seq):
"""
Return a list with unique elements from sequence, preserving order.
"""
seen = set()
return [x for x in seq if x not in seen and not seen.add(x)]
def _create_transport_serializer(serializer_id):
if serializer_id in [u'msgpack', u'mgspack.batched']:
# try MsgPack WAMP serializer
try:
from autobahn.wamp.serializer import MsgPackSerializer
except ImportError:
pass
else:
if serializer_id == u'mgspack.batched':
return MsgPackSerializer(batched=True)
else:
return MsgPackSerializer()
if serializer_id in [u'json', u'json.batched']:
# try JSON WAMP serializer
try:
from autobahn.wamp.serializer import JsonSerializer
except ImportError:
pass
else:
if serializer_id == u'json.batched':
return JsonSerializer(batched=True)
else:
return JsonSerializer()
raise RuntimeError('could not create serializer for "{}"'.format(serializer_id))
def _create_transport_serializers(transport):
"""
Create a list of serializers to use with a WAMP protocol factory.
"""
serializers = []
for serializer_id in transport.serializers:
if serializer_id == u'msgpack':
# try MsgPack WAMP serializer
try:
from autobahn.wamp.serializer import MsgPackSerializer
except ImportError:
pass
else:
serializers.append(MsgPackSerializer(batched=True))
serializers.append(MsgPackSerializer())
elif serializer_id == u'json':
# try JSON WAMP serializer
try:
from autobahn.wamp.serializer import JsonSerializer
except ImportError:
pass
else:
serializers.append(JsonSerializer(batched=True))
serializers.append(JsonSerializer())
else:
raise RuntimeError(
"Unknown serializer '{}'".format(serializer_id)
)
return serializers
def _camel_case_from_snake_case(snake):
parts = snake.split('_')
return parts[0] + ''.join([s.capitalize() for s in parts[1:]])
def _create_transport_factory(loop, transport, session_factory):
"""
Create a WAMP-over-XXX transport factory.
"""
if transport.type == u'websocket':
serializers = _create_transport_serializers(transport)
factory = WampWebSocketClientFactory(session_factory, url=transport.url, serializers=serializers)
elif transport.type == u'rawsocket':
serializer = _create_transport_serializer(transport.serializers[0])
factory = WampRawSocketClientFactory(session_factory, serializer=serializer)
else:
assert(False), 'should not arrive here'
# set the options one at a time so we can give user better feedback
for k, v in transport.options.items():
try:
factory.setProtocolOptions(**{k: v})
except (TypeError, KeyError):
# this allows us to document options as snake_case
# until everything internally is upgraded from
# camelCase
try:
factory.setProtocolOptions(
**{_camel_case_from_snake_case(k): v}
)
except (TypeError, KeyError):
raise ValueError(
"Unknown {} transport option: {}={}".format(transport.type, k, v)
)
return factory
class Component(component.Component):
"""
A component establishes a transport and attached a session
to a realm using the transport for communication.
The transports a component tries to use can be configured,
as well as the auto-reconnect strategy.
"""
log = txaio.make_logger()
session_factory = Session
"""
The factory of the session we will instantiate.
"""
def _check_native_endpoint(self, endpoint):
if isinstance(endpoint, dict):
if u'tls' in endpoint:
tls = endpoint[u'tls']
if isinstance(tls, (dict, bool)):
pass
elif isinstance(tls, ssl.SSLContext):
pass
else:
raise ValueError(
"'tls' configuration must be a dict, bool or "
"SSLContext instance"
)
else:
raise ValueError(
"'endpoint' configuration must be a dict or IStreamClientEndpoint"
" provider"
)
def _connect_transport(self, loop, transport, session_factory):
coro = self._coro_connect_transport(loop, transport, session_factory)
return asyncio.Task(coro)
@asyncio.coroutine
def _coro_connect_transport(self, loop, transport, session_factory):
"""
Create and connect a WAMP-over-XXX transport.
"""
factory = _create_transport_factory(loop, transport, session_factory)
if transport.endpoint[u'type'] == u'tcp':
version = transport.endpoint.get(u'version', 4)
if version not in [4, 6]:
raise ValueError('invalid IP version {} in client endpoint configuration'.format(version))
host = transport.endpoint[u'host']
if type(host) != six.text_type:
raise ValueError('invalid type {} for host in client endpoint configuration'.format(type(host)))
port = transport.endpoint[u'port']
if type(port) not in six.integer_types:
raise ValueError('invalid type {} for port in client endpoint configuration'.format(type(port)))
timeout = transport.endpoint.get(u'timeout', 10) # in seconds
if type(timeout) not in six.integer_types:
raise ValueError('invalid type {} for timeout in client endpoint configuration'.format(type(timeout)))
tls = transport.endpoint.get(u'tls', None)
tls_hostname = None
# create a TLS enabled connecting TCP socket
if tls:
if isinstance(tls, dict):
for k in tls.keys():
if k not in [u"hostname", u"trust_root"]:
raise ValueError("Invalid key '{}' in 'tls' config".format(k))
hostname = tls.get(u'hostname', host)
if type(hostname) != six.text_type:
raise ValueError('invalid type {} for hostname in TLS client endpoint configuration'.format(hostname))
cert_fname = tls.get(u'trust_root', None)
tls_hostname = hostname
tls = True
if cert_fname is not None:
tls = ssl.create_default_context(
purpose=ssl.Purpose.SERVER_AUTH,
cafile=cert_fname,
)
elif isinstance(tls, ssl.SSLContext):
# tls=<an SSLContext> is valid
tls_hostname = host
elif tls in [False, True]:
if tls:
tls_hostname = host
else:
raise RuntimeError('unknown type {} for "tls" configuration in transport'.format(type(tls)))
f = loop.create_connection(
protocol_factory=factory,
host=host,
port=port,
ssl=tls,
server_hostname=tls_hostname,
)
transport, protocol = yield from asyncio.wait_for(f, timeout=timeout) # noqa
elif transport.endpoint[u'type'] == u'unix':
path = transport.endpoint[u'path']
timeout = int(transport.endpoint.get(u'timeout', 10)) # in seconds
f = loop.create_unix_connection(
protocol_factory=factory,
path=path,
)
transport, protocol = yield from asyncio.wait_for(f, timeout=timeout)
else:
assert(False), 'should not arrive here'
@asyncio.coroutine
def start(self, loop=None):
"""
This starts the Component, which means it will start connecting
(and re-connecting) to its configured transports. A Component
runs until it is "done", which means one of:
- There was a "main" function defined, and it completed successfully;
- Something called ``.leave()`` on our session, and we left successfully;
- ``.stop()`` was called, and completed successfully;
- none of our transports were able to connect successfully (failure);
:returns: a Deferred that fires (with ``None``) when we are
"done" or with a Failure if something went wrong.
"""
if loop is None:
self.log.warn("Using default loop")
loop = asyncio.get_default_loop()
yield from self.fire('start', loop, self)
# transports to try again and again ..
transport_gen = itertools.cycle(self._transports)
reconnect = True
self.log.debug('Entering re-connect loop')
while reconnect:
# cycle through all transports forever ..
transport = next(transport_gen)
# only actually try to connect using the transport,
# if the transport hasn't reached max. connect count
if transport.can_reconnect():
delay = transport.next_delay()
self.log.debug(
'trying transport {transport_idx} using connect delay {transport_delay}',
transport_idx=transport.idx,
transport_delay=delay,
)
yield from asyncio.sleep(delay)
try:
transport.connect_attempts += 1
yield from self._connect_once(loop, transport)
transport.connect_sucesses += 1
except asyncio.CancelledError:
reconnect = False
break
except Exception as e:
transport.connect_failures += 1
f = txaio.create_failure()
self.log.error(u'component failed: {error}', error=txaio.failure_message(f))
self.log.error(u'{tb}', tb=txaio.failure_format_traceback(f))
# If this is a "fatal error" that will never work,
# we bail out now
if isinstance(e, ApplicationError):
if e.error in [u'wamp.error.no_such_realm']:
reconnect = False
self.log.error(u"Fatal error, not reconnecting")
# The thinking here is that we really do
# want to 'raise' (and thereby fail the
# entire "start" / reconnect loop) because
# if the realm isn't valid, we're "never"
# going to succeed...
raise
self.log.error(u"{msg}", msg=e.error_message())
elif _is_ssl_error(e):
# Quoting pyOpenSSL docs: "Whenever
# [SSL.Error] is raised directly, it has a
# list of error messages from the OpenSSL
# error queue, where each item is a tuple
# (lib, function, reason). Here lib, function
# and reason are all strings, describing where
# and what the problem is. See err(3) for more
# information."
self.log.error(u"TLS failure: {reason}", reason=e.args[1])
self.log.error(u"Marking this transport as failed")
transport.failed()
else:
f = txaio.create_failure()
self.log.error(
u'Connection failed: {error}',
error=txaio.failure_message(f),
)
# some types of errors should probably have
# stacktraces logged immediately at error
# level, e.g. SyntaxError?
self.log.debug(u'{tb}', tb=txaio.failure_format_traceback(f))
else:
self.log.debug(u"Not reconnecting")
reconnect = False
else:
# check if there is any transport left we can use
# to connect
if not self._can_reconnect():
self.log.info("No remaining transports to try")
reconnect = False
def stop(self):
return self._session.leave()
def run(components, log_level='info'):
"""
High-level API to run a series of components.
This will only return once all the components have stopped
(including, possibly, after all re-connections have failed if you
have re-connections enabled). Under the hood, this calls
XXX fixme for asyncio
-- if you wish to manage the loop loop yourself, use the
:meth:`autobahn.asyncio.component.Component.start` method to start
each component yourself.
:param components: the Component(s) you wish to run
:type components: Component or list of Components
:param log_level: a valid log-level (or None to avoid calling start_logging)
:type log_level: string
"""
# actually, should we even let people "not start" the logging? I'm
# not sure that's wise... (double-check: if they already called
# txaio.start_logging() what happens if we call it again?)
if log_level is not None:
txaio.start_logging(level=log_level)
loop = asyncio.get_event_loop()
log = txaio.make_logger()
# see https://github.com/python/asyncio/issues/341 asyncio has
# "odd" handling of KeyboardInterrupt when using Tasks (as
# run_until_complete does). Another option is to just resture
# default SIGINT handling, which is to exit:
# import signal
# signal.signal(signal.SIGINT, signal.SIG_DFL)
@asyncio.coroutine
def exit():
loop.stop()
def nicely_exit(signal):
log.info("Shutting down due to {signal}", signal=signal)
for task in asyncio.Task.all_tasks():
task.cancel()
asyncio.ensure_future(exit())
print(partial(nicely_exit, 'SIGINT'))
loop.add_signal_handler(signal.SIGINT, partial(nicely_exit, 'SIGINT'))
loop.add_signal_handler(signal.SIGTERM, partial(nicely_exit, 'SIGTERM'))
# returns a future; could run_until_complete() but see below
component._run(loop, components)
try:
loop.run_forever()
# this is probably more-correct, but then you always get
# "Event loop stopped before Future completed":
# loop.run_until_complete(f)
except asyncio.CancelledError:
pass
# finally:
# signal.signal(signal.SIGINT, signal.SIG_DFL)
# signal.signal(signal.SIGTERM, signal.SIG_DFL)

View File

@ -278,3 +278,20 @@ class ApplicationRunner(object):
loop.run_until_complete(protocol._session.leave())
loop.close()
class Session(protocol._SessionShim):
# XXX these methods are redundant, but put here for possibly
# better clarity; maybe a bad idea.
def on_join(self, details):
pass
def on_leave(self, details):
self.disconnect()
def on_connect(self):
self.join(self.config.realm)
def on_disconnect(self):
pass

View File

@ -37,7 +37,7 @@ from twisted.internet.endpoints import TCP4ClientEndpoint
try:
_TLS = True
from twisted.internet.endpoints import SSL4ClientEndpoint
from twisted.internet.ssl import optionsForClientTLS, CertificateOptions
from twisted.internet.ssl import optionsForClientTLS, CertificateOptions, Certificate
from twisted.internet.interfaces import IOpenSSLClientConnectionCreator
from OpenSSL import SSL
except ImportError as e:
@ -222,11 +222,17 @@ def _create_transport_endpoint(reactor, endpoint_config):
context = IOpenSSLClientConnectionCreator(tls)
elif isinstance(tls, dict):
for k in tls.keys():
if k not in [u"hostname", u"trust_root"]:
raise ValueError("Invalid key '{}' in 'tls' config".format(k))
hostname = tls.get(u'hostname', host)
if type(hostname) != six.text_type:
raise ValueError('invalid type {} for hostname in TLS client endpoint configuration'.format(hostname))
context = optionsForClientTLS(hostname)
trust_root = None
cert_fname = tls.get(u"trust_root", None)
if cert_fname is not None:
trust_root = Certificate.loadPEM(six.u(open(cert_fname, 'r').read()))
context = optionsForClientTLS(hostname, trustRoot=trust_root)
elif isinstance(tls, CertificateOptions):
context = tls

View File

@ -29,7 +29,6 @@ from __future__ import absolute_import
import six
import inspect
import binascii
from functools import reduce
import txaio
txaio.use_twisted() # noqa
@ -727,127 +726,9 @@ if service:
# new API
class Session(ApplicationSession):
# shim that lets us present pep8 API for user-classes to override,
# but also backwards-compatible for existing code using
# ApplicationSession "directly".
# XXX note to self: if we release this as "the" API, then we can
# change all internal Autobahn calls to .on_join() etc, and make
# ApplicationSession a subclass of Session -- and it can then be
# separate deprecated and removed, ultimately, if desired.
#: name -> IAuthenticator
_authenticators = None
def onJoin(self, details):
return self.on_join(details)
def onConnect(self):
if self._authenticators:
# authid, authrole *must* match across all authenticators
# (checked in add_authenticator) so these lists are either
# [None] or [None, 'some_authid']
authid = [x._args.get('authid', None) for x in self._authenticators.values()][-1]
authrole = [x._args.get('authrole', None) for x in self._authenticators.values()][-1]
authextra = self._merged_authextra()
self.join(
self.config.realm,
authmethods=self._authenticators.keys(),
authid=authid or u'public',
authrole=authrole or u'default',
authextra=authextra,
)
else:
super(Session, self).onConnect()
def onChallenge(self, challenge):
try:
authenticator = self._authenticators[challenge.method]
except KeyError:
raise RuntimeError(
"Received challenge for unknown authmethod '{}'".format(
challenge.method
)
)
return authenticator.on_challenge(self, challenge)
def onLeave(self, details):
return self.on_leave(details)
def onDisconnect(self):
return self.on_disconnect()
# experimental authentication API
def add_authenticator(self, name, **kw):
if self._authenticators is None:
self._authenticators = {}
try:
auth = {
'cryptosign': AuthCryptoSign,
'wampcra': AuthWampCra,
}[name](**kw)
except KeyError:
raise RuntimeError(
"Unknown authenticator '{}'".format(name)
)
# all authids must match
unique_authids = set([
a._args['authid']
for a in self._authenticators.values()
if 'authid' in a._args
])
if len(unique_authids) > 1:
raise ValueError(
"Inconsistent authids: {}".format(
' '.join(unique_authids),
)
)
# all authroles must match
unique_authroles = set([
a._args['authrole']
for a in self._authenticators.values()
if 'authrole' in a._args
])
if len(unique_authroles) > 1:
raise ValueError(
"Inconsistent authroles: '{}' vs '{}'".format(
' '.join(unique_authroles),
)
)
# can we do anything else other than merge all authextra keys?
# here we check that any duplicate keys have the same values
authextra = kw.get('authextra', {})
merged = self._merged_authextra()
for k, v in merged:
if k in authextra and authextra[k] != v:
raise ValueError(
"Inconsistent authextra values for '{}': '{}' vs '{}'".format(
k, v, authextra[k],
)
)
# validation complete, add it
self._authenticators[name] = auth
def _merged_authextra(self):
authextras = [a._args.get('authextra', {}) for a in self._authenticators.values()]
# for all existing _authenticators, we've already checked that
# if they contain a key it has the same value as all others.
return {
k: v
for k, v in zip(
reduce(lambda x, y: x | set(y.keys()), authextras, set()),
reduce(lambda x, y: x | set(y.values()), authextras, set())
)
}
# these are the actual "new API" methods (i.e. snake_case)
#
class Session(protocol._SessionShim):
# XXX these methods are redundant, but put here for possibly
# better clarity; maybe a bad idea.
def on_join(self, details):
pass
@ -855,6 +736,9 @@ class Session(ApplicationSession):
def on_leave(self, details):
self.disconnect()
def on_connect(self):
self.join(self.config.realm)
def on_disconnect(self):
pass

View File

@ -39,14 +39,136 @@ from struct import Struct
from operator import xor
from itertools import starmap
from autobahn.util import public
from autobahn.wamp.interfaces import IAuthenticator
__all__ = (
'AuthCryptoSign',
'AuthWampCra',
'pbkdf2',
'generate_totp_secret',
'compute_totp',
'derive_key',
'generate_wcs',
'compute_wcs')
'compute_wcs',
)
def create_authenticator(name, **kwargs):
"""
Accepts various keys and values to configure an authenticator. The
valid keys depend on the kind of authenticator but all can
understand: `authextra`, `authid` and `authrole`
:return: an instance implementing IAuthenticator with the given
configuration.
"""
try:
klass = {
AuthWampCra.name: AuthWampCra,
AuthCryptoSign.name: AuthCryptoSign,
}[name]
except KeyError:
raise ValueError(
"Unknown authenticator '{}'".format(name)
)
# this may raise further ValueErrors if the kwargs are wrong
authenticator = klass(**kwargs)
return authenticator
# experimental authentication API
class AuthCryptoSign(object):
name = u'cryptosign'
def __init__(self, **kw):
# should put in checkconfig or similar
for key in kw.keys():
if key not in [u'authextra', u'authid', u'authrole', u'privkey']:
raise ValueError(
"Unexpected key '{}' for {}".format(key, self.__class__.__name__)
)
for key in [u'privkey', u'authid']:
if key not in kw:
raise ValueError(
"Must provide '{}' for cryptosign".format(key)
)
for key in kw.get('authextra', dict()):
if key not in [u'pubkey']:
raise ValueError(
"Unexpected key '{}' in 'authextra'".format(key)
)
from autobahn.wamp.cryptosign import SigningKey
self._privkey = SigningKey.from_key_bytes(
binascii.a2b_hex(kw[u'privkey'])
)
if u'pubkey' in kw.get(u'authextra', dict()):
pubkey = kw[u'authextra'][u'pubkey']
if pubkey != self._privkey.public_key():
raise ValueError(
"Public key doesn't correspond to private key"
)
else:
kw[u'authextra'] = kw.get(u'authextra', dict())
kw[u'authextra'][u'pubkey'] = self._privkey.public_key()
self._args = kw
@property
def authextra(self):
return self._args.get(u'authextra', dict())
def on_challenge(self, session, challenge):
return self._privkey.sign_challenge(session, challenge)
IAuthenticator.register(AuthCryptoSign)
class AuthWampCra(object):
name = u'wampcra'
def __init__(self, **kw):
# should put in checkconfig or similar
for key in kw.keys():
if key not in [u'authextra', u'authid', u'authrole', u'secret']:
raise ValueError(
"Unexpected key '{}' for {}".format(key, self.__class__.__name__)
)
for key in [u'secret', u'authid']:
if key not in kw:
raise ValueError(
"Must provide '{}' for wampcra".format(key)
)
self._args = kw
self._secret = kw.pop(u'secret')
if not isinstance(self._secret, six.text_type):
self._secret = self._secret.decode('utf8')
@property
def authextra(self):
return self._args.get(u'authextra', dict())
def on_challenge(self, session, challenge):
key = self._secret.encode('utf8')
if u'salt' in challenge.extra:
key = derive_key(
key,
challenge.extra['salt'],
challenge.extra['iterations'],
challenge.extra['keylen']
)
signature = compute_wcs(
key,
challenge.extra['challenge'].encode('utf8')
)
return signature.decode('ascii')
IAuthenticator.register(AuthWampCra)
@public

View File

@ -40,6 +40,7 @@ from autobahn.util import ObservableMixin
from autobahn.websocket.util import parse_url
from autobahn.wamp.types import ComponentConfig, SubscribeOptions, RegisterOptions
from autobahn.wamp.exception import SessionNotReady
from autobahn.wamp.auth import create_authenticator
__all__ = ('Connection')
@ -476,7 +477,8 @@ class Component(ObservableMixin):
try:
session = self.session_factory(cfg)
for auth_name, auth_config in self._authentication.items():
session.add_authenticator(auth_name, **auth_config)
authenticator = create_authenticator(auth_name, **auth_config)
session.add_authenticator(authenticator)
except Exception as e:
# couldn't instantiate session calls, which is fatal.
@ -500,7 +502,7 @@ class Component(ObservableMixin):
"session leaving '{details.reason}'",
details=details,
)
if self._entry:
if self._entry and not txaio.is_called(done):
txaio.resolve(done, None)
session.on('leave', on_leave)
@ -573,7 +575,7 @@ class Component(ObservableMixin):
def on_connect_failure(err):
transport.connect_failures += 1
# failed to establish a connection in the first place
done.errback(err)
txaio.reject(done, err)
txaio.add_callbacks(d, on_connect_sucess, None)
txaio.add_callbacks(d, None, on_connect_failure)
@ -665,7 +667,7 @@ def _run(reactor, components):
def component_start(comp):
# the future from start() errbacks if we fail, or callbacks
# when the component is considered "done" (so maybe never)
d = comp.start(reactor)
d = txaio.as_future(comp.start, reactor)
txaio.add_callbacks(
d,
partial(component_success, comp),

View File

@ -29,6 +29,7 @@ from __future__ import absolute_import
import six
import txaio
import inspect
from functools import reduce
from autobahn import wamp
from autobahn.util import public, IdGenerator, ObservableMixin
@ -38,7 +39,7 @@ from autobahn.wamp import types
from autobahn.wamp import role
from autobahn.wamp import exception
from autobahn.wamp.exception import ApplicationError, ProtocolError, SessionNotReady, SerializationError
from autobahn.wamp.interfaces import ISession, IPayloadCodec # noqa
from autobahn.wamp.interfaces import ISession, IPayloadCodec, IAuthenticator # noqa
from autobahn.wamp.types import SessionDetails, CloseDetails, EncodedPayload
from autobahn.wamp.request import \
Publication, \
@ -1520,6 +1521,131 @@ class ApplicationSession(BaseSession):
return on_reply
# this is NOT public; import from either autobahn.asyncio.wamp or
# autobahn.twisted.wamp
class _SessionShim(ApplicationSession):
"""
shim that lets us present pep8 API for user-classes to override,
but also backwards-compatible for existing code using
ApplicationSession "directly".
"""
#: name -> IAuthenticator
_authenticators = None
def onJoin(self, details):
return self.on_join(details)
def onConnect(self):
if self._authenticators:
# authid, authrole *must* match across all authenticators
# (checked in add_authenticator) so these lists are either
# [None] or [None, 'some_authid']
authid = [x._args.get('authid', None) for x in self._authenticators.values()][-1]
authrole = [x._args.get('authrole', None) for x in self._authenticators.values()][-1]
authextra = self._merged_authextra()
self.join(
self.config.realm,
authmethods=list(self._authenticators.keys()),
authid=authid or u'public',
authrole=authrole or u'default',
authextra=authextra,
)
else:
self.on_connect()
def onChallenge(self, challenge):
try:
authenticator = self._authenticators[challenge.method]
except KeyError:
raise RuntimeError(
"Received challenge for unknown authmethod '{}'".format(
challenge.method
)
)
return authenticator.on_challenge(self, challenge)
def onLeave(self, details):
return self.on_leave(details)
def onDisconnect(self):
return self.on_disconnect()
# experimental authentication API
def add_authenticator(self, authenticator):
assert isinstance(authenticator, IAuthenticator)
if self._authenticators is None:
self._authenticators = {}
# before adding this authenticator we need to validate that
# it's consistent with any other authenticators we may have --
# for example, they must all agree on "authid" etc because
# .join() only takes one value for all of those.
def at_most_one(name):
uni = set([
a._args[name]
for a in list(self._authenticators.values()) + [authenticator]
if name in a._args
])
if len(uni) > 1:
raise ValueError(
"Inconsistent {}s: {}".format(
name,
' '.join(uni),
)
)
# all authids must match
at_most_one('authid')
# all authroles must match
at_most_one('authrole')
# can we do anything else other than merge all authextra keys?
# here we check that any duplicate keys have the same values
authextra = authenticator.authextra
merged = self._merged_authextra()
for k, v in merged:
if k in authextra and authextra[k] != v:
raise ValueError(
"Inconsistent authextra values for '{}': '{}' vs '{}'".format(
k, v, authextra[k],
)
)
# validation complete, add it
self._authenticators[authenticator.name] = authenticator
def _merged_authextra(self):
authextras = [a._args.get('authextra', {}) for a in self._authenticators.values()]
# for all existing _authenticators, we've already checked that
# if they contain a key it has the same value as all others.
return {
k: v
for k, v in zip(
reduce(lambda x, y: x | set(y.keys()), authextras, set()),
reduce(lambda x, y: x | set(y.values()), authextras, set())
)
}
# these are the actual "new API" methods (i.e. snake_case)
#
def on_join(self, details):
pass
def on_leave(self, details):
self.disconnect()
def on_connect(self):
self.join(self.config.realm)
def on_disconnect(self):
pass
# ISession.register collides with the abc.ABCMeta.register method
# ISession.register(ApplicationSession)

View File

@ -0,0 +1,3 @@
This demonstrates the WAMP "Component" API which is intended to be a
more functional-style API (that is, instead of the subclassing-based
API of ApplicationSession)

View File

@ -0,0 +1,54 @@
from autobahn.asyncio.component import Component, run
from autobahn.wamp.types import RegisterOptions
import asyncio
import ssl
context = ssl.create_default_context(
purpose=ssl.Purpose.SERVER_AUTH,
cafile='../../../router/.crossbar/server.crt',
)
component = Component(
transports=[
{
"type": "websocket",
"url": u"wss://localhost:8083/ws",
"endpoint": {
"type": "tcp",
"host": "localhost",
"port": 8083,
"tls": context,
},
"options": {
"open_handshake_timeout": 100,
}
},
],
realm=u"crossbardemo",
)
@component.on_join
def join(session, details):
print("joined {}".format(details))
@component.register(
u"example.foo",
options=RegisterOptions(details_arg='details'),
)
@asyncio.coroutine
def foo(*args, **kw):
print("foo({}, {})".format(args, kw))
for x in range(5, 0, -1):
print(" returning in {}".format(x))
yield from asyncio.sleep(1)
print("returning '42'")
return 42
if __name__ == "__main__":
run([component])

View File

@ -0,0 +1,56 @@
from autobahn.asyncio.component import Component, run
from autobahn.wamp.types import RegisterOptions
from autobahn.wamp.exception import ApplicationError
import asyncio
@asyncio.coroutine
def main(reactor, session):
print("Client session={}".format(session))
try:
res = yield from session.register(lambda: None, u"com.foo.private")
print("\n\nregistering 'com.foo.private' should have failed\n\n")
except ApplicationError as e:
print("registering 'com.foo.private' failed as expected: {}".format(e.error))
res = yield from session.register(
lambda: None, u"should.work",
options=RegisterOptions(match=u'exact'),
)
print("registered 'should.work' with id {}".format(res.id))
try:
res = yield from session.register(
lambda: None, u"prefix.fail.",
options=RegisterOptions(match=u'prefix'),
)
print("\n\nshould have failed\n\n")
except ApplicationError as e:
print("prefix-match 'prefix.fail.' failed as expected: {}".format(e.error))
print("calling 'example.foo'")
res = yield from session.call(u"example.foo")
print("example.foo() = {}".format(res))
print("done")
component = Component(
transports=u"ws://localhost:8080/auth_ws",
main=main,
realm=u"crossbardemo",
authentication={
u"wampcra": {
u"authid": u"bob",
u"authrole": "dynamic_authed",
u"secret": u"p4ssw0rd",
}
}
)
if __name__ == "__main__":
run([component])

View File

@ -1,22 +1,22 @@
-----BEGIN CERTIFICATE-----
MIIDlTCCAn2gAwIBAgIJAKkagqXmV6JSMA0GCSqGSIb3DQEBCwUAMGExCzAJBgNV
MIIDlTCCAn2gAwIBAgIJANOD1A80gDOaMA0GCSqGSIb3DQEBCwUAMGExCzAJBgNV
BAYTAkRFMRAwDgYDVQQIDAdCYXZhcmlhMREwDwYDVQQHDAhFcmxhbmdlbjEZMBcG
A1UECgwQQ3Jvc3NiYXIuaW8gR21iSDESMBAGA1UEAwwJbG9jYWxob3N0MB4XDTE3
MDMzMTAzMzY1MloXDTE3MDQzMDAzMzY1MlowYTELMAkGA1UEBhMCREUxEDAOBgNV
MDcwODIyNTMwN1oXDTE3MDgwNzIyNTMwN1owYTELMAkGA1UEBhMCREUxEDAOBgNV
BAgMB0JhdmFyaWExETAPBgNVBAcMCEVybGFuZ2VuMRkwFwYDVQQKDBBDcm9zc2Jh
ci5pbyBHbWJIMRIwEAYDVQQDDAlsb2NhbGhvc3QwggEiMA0GCSqGSIb3DQEBAQUA
A4IBDwAwggEKAoIBAQDfJWhr4HYhTLpm7ihDjaThKWt4Qs7uXjmexzY4Ke54ns52
W6viMVILZ8YHpWoJdmBuXycR3Rutw2TazNcuZKMlxWD2k7OCoJm44Z8MOqe2er1J
uZ8xVFr/Y/sAiyBgyZ/NRy/FIYcggjQywqmQ8OSzfYwUYFo7NYm5DqbQFLJYnxqA
p8tf2jEJURc1OIHDf2OvOxsb9OnqkhcyjVzpqfDX6cx904k5o9QP35I2yYmIsgen
cK6IYaUF3nl02xU8aTQefINIPYa+bPUWBzZNc5OBdOk4P601hesfz8X/UVakdmSW
HU3jbOA+yzAMeTpHvylyw7KIGGAf9Cv15wyAYpRlAgMBAAGjUDBOMB0GA1UdDgQW
BBQSt25uCominXIeBbBg+TxYGCSvwDAfBgNVHSMEGDAWgBQSt25uCominXIeBbBg
+TxYGCSvwDAMBgNVHRMEBTADAQH/MA0GCSqGSIb3DQEBCwUAA4IBAQBTUUSgsYUc
bt2ZObrifsOiXbu2oD5/yjVqAhxmG/EHAcW7g7zMQ9CJQiId9DS3IKWSwwsIL694
4qoAi29jPYGtanIlqY9aniGdj2Pspw7DkavQx8Nzx7T72daWdJKnmon9SN0wHczK
ZAbIn3LDqNbq/TCSV19zfq7fvFRsoMwp17icWZn4D5QijCsGqh+oAs3LNJoS63rJ
YvoC2kVDXizSkK+Z6Glut6WyywuFwsC/wLsVi0kAFNVd34MMKt2qAI9a8geefzHu
grGkWckNC6yoo0D81xEJGaIS8fUQW4DBOXk1T1xeu2NkNBFR+FGPYZhcd+ybk6hn
RafuEU5hCde/
A4IBDwAwggEKAoIBAQC7quJmYs/XsYvoHAWx9ip1HvtclcIoGtYXFI+WDiXBseXw
VGPKFIQ8LrE00AKvj5EnwQGKYY5FVUJz9MoWmR+YBVRNqL1FbCoN5v5y9GtetQur
XqCJ2Ur9jYgq+hqHnzQxC9OLmgqnHi7JWikJ5JY4kJXEI7Vl4WGFU6bOO+P1w8jR
9+vt5q/rB0AqSLjPnCLy7huhQPnV/5CxKXI0p1DDtoM3jNwU38Fxh3XHgkPc202z
UPAJDByHt38Ex2b82DWP4e2vHeNcYYHEBazwCD5+thcF61ZS+UAdy0a2mf9+56we
Ae031HW0xdqbl8dnprGjCpAsMg0kthDjUljlsdbXAgMBAAGjUDBOMB0GA1UdDgQW
BBTaVzim9tYIotxyKiZyuB+DpVDEBDAfBgNVHSMEGDAWgBTaVzim9tYIotxyKiZy
uB+DpVDEBDAMBgNVHRMEBTADAQH/MA0GCSqGSIb3DQEBCwUAA4IBAQCHhgZePon7
KEU01jRekD63u8EacyLP1xFCb0KmwV7T92tpe4mSduwHhyAYQT/8wPMaLA0qwU/D
Ltqj9CAaytGDpF5j5j4KErRzlckC6k5QttvllAVz9dQUNCO6ObXLKBsINfb9+c3/
zZ3U5yvOB6cdAfUHTx6uF/SOS9+uy1Uj7wsiP7+gqclPNKyst9qBUipXLEws4WHS
cZBCKZgxQJvQ+taCnukWZ/Xq5LZXl5TeCn8i1ioYr1s1hvaEYiyStHYztv+yWt+Q
lYPDzNlRdwe/GP89ffXIAtgzwssM6tFVngpvocDQtCNl0i8aiqNrKnEEdLLTEKt2
sD6AP5441/ZU
-----END CERTIFICATE-----

View File

@ -1,28 +1,28 @@
-----BEGIN PRIVATE KEY-----
MIIEvwIBADANBgkqhkiG9w0BAQEFAASCBKkwggSlAgEAAoIBAQDfJWhr4HYhTLpm
7ihDjaThKWt4Qs7uXjmexzY4Ke54ns52W6viMVILZ8YHpWoJdmBuXycR3Rutw2Ta
zNcuZKMlxWD2k7OCoJm44Z8MOqe2er1JuZ8xVFr/Y/sAiyBgyZ/NRy/FIYcggjQy
wqmQ8OSzfYwUYFo7NYm5DqbQFLJYnxqAp8tf2jEJURc1OIHDf2OvOxsb9Onqkhcy
jVzpqfDX6cx904k5o9QP35I2yYmIsgencK6IYaUF3nl02xU8aTQefINIPYa+bPUW
BzZNc5OBdOk4P601hesfz8X/UVakdmSWHU3jbOA+yzAMeTpHvylyw7KIGGAf9Cv1
5wyAYpRlAgMBAAECggEAZBYwPuqfsZHrrY02rA4VHRTp6jf0/r9uksQBkrFkD+AS
9M1rS/oZNZpnHTjZVcMtv3OYX4/QAWE5lFX6Ugm7j453TkGza2zDZkR0hRSt8tAc
pLvIJmUSp6hdhcHhVOg6b+V1DLNcG32rEfVz/tm9irBOKzmIXzRFQKbMtVoDIHHt
vqE5Kb6vv55PK8Zw9G4wZJFEZJIJAc0y1lAJ/8VDyZhPfslO4R2sQeVnKClLaaZx
Np/YWPqft06rM7aKXHXmlIFgmE7oPw0ac9C03qK3oSMDQX9JVnv+SvrGA/hkYQxS
b/jJ79nRfeOtfI1Y7QAKGnNVPQ6WEGkCINzsBliixQKBgQD849E0waWfwiaDiK23
I04Il36hus3KpeUYYa/QiwWR7/i9YtobOXVGnPACgwTeWg/4ocm+gfclwVYtE/lx
G+heDqN68Q0i9UozsGJ62AnzzmAk/NCBprJkLDrx4yug9EGmAQitiuSDtpogmNjR
iTB2Iwd5X8zK1c8Q924/5qRrEwKBgQDh4/J8Jtdya2LqvtfTHAHDTC8/+uog1B6L
xlQitJsgKvlDFpzkIlVmZJzMseRSHbZ5QxxaIvkTAdX4G3A5QCQByTxiMZaKhgV9
K/ZvDUq0ZupnGBRxDHnwoo+wuA03kej/lV/swuxqbx33j5wldLSpctrNm2T6Of91
6q9GV0C5pwKBgQDTdmiMldS5ty5fVJ32AraQpqVD9aF5b6kW7zCF0IoS0zgKnStG
EpDHQBnN7+LVTzgFrru06pZQYs99mDM/6pcud00qU4Cpl7S5biebEyWXUh3BMAnm
MA8Bhp6vf0cPKs/uUPUBsJ+Op9VPpp++7tmwH3BzhLEJdynTxSLSmjkoMwKBgQDa
GAYnfiznaO0RLn8ccV72W1kHUc+5MHPpievzsiJP9Y3CC1QhI1EG0j9dlqeV7OAf
xf7avAd1JYGb0YoRjG73Vn65Y50wU2N7TfXZlKC6+t53+RTLbTtSbPGEsr74FyU5
ltQNuMkfnfb17S7aLwq6y4yblNvyiAqwo4zkwhtY4wKBgQDBv/gfFoP13OurDlNy
MkYuKD4z/kfveymjI733DHDGAO8oIRcsQh8rLyb3nTiTy5Z1fiPxHwpozncfJI4v
GS7iBOtMRBVl8of+0G9l6HoIsYf9dGOlyS7+7u9cR/xs9Dr6tBbEdMiHOksTPmBU
mTaIeCChFvPw3enORePnxRrNyQ==
MIIEvwIBADANBgkqhkiG9w0BAQEFAASCBKkwggSlAgEAAoIBAQC7quJmYs/XsYvo
HAWx9ip1HvtclcIoGtYXFI+WDiXBseXwVGPKFIQ8LrE00AKvj5EnwQGKYY5FVUJz
9MoWmR+YBVRNqL1FbCoN5v5y9GtetQurXqCJ2Ur9jYgq+hqHnzQxC9OLmgqnHi7J
WikJ5JY4kJXEI7Vl4WGFU6bOO+P1w8jR9+vt5q/rB0AqSLjPnCLy7huhQPnV/5Cx
KXI0p1DDtoM3jNwU38Fxh3XHgkPc202zUPAJDByHt38Ex2b82DWP4e2vHeNcYYHE
BazwCD5+thcF61ZS+UAdy0a2mf9+56weAe031HW0xdqbl8dnprGjCpAsMg0kthDj
UljlsdbXAgMBAAECggEBAI1HOwMrFh7h6VpGZj8v+4yKU17C2xSIB23+q42ifugP
ffPsA+eohFguKneAtNTntrX/xaIgiRNG+S0ENnLGuZMfR/TiuAIgSJSItjtiJ8Z3
cx6CsUa1T5AL0VvRRtGcl5Tou0vYHnbq8PviJQzuiexxZF3xJ+lXv6u4atXNHnLv
sz423bo4vyM99SUstK8zhtbybdKNbwiUv9zai+QmBDE+nJP3y9heyKDyhnYJ6VoN
YjpM7CT6xVezOfQjAirgKraE6emFskY0zQPvo5FNO8PjXT96Za42qV+L2IimDtNz
y5NF7OioXslr7htdf6bOu8sPxauw89kjxieM3Q+ISXECgYEA7ALOE5ZLBM6/ZH2z
NUu8oYvX3xyGvJWe+gqr9wR2ekPAv/8wZ3gtBW8fA8DPsZGW2atV45DC5d5y5Hzt
wI82C/qhs7AAoKTP64dXOHBhDTrWTf8P1xW9ba6bFrCvbLUUcdbHfk0NKMeudQMm
uUzgAHmRSWLw87ybjrNagV5BlN8CgYEAy4/lU8XkxYbXuIpCyL/0AEQ3eiFi8z5V
g2WSJ5vL8e0vcoqoqMTjLAGFGIrVbDFcbze03NZQD6I+nMPhMWvcwax1B+Ao8REJ
8fikeXGiTI6/d/vSmkO2dNJxl5AolRLxR/NL+HF9ISI7PQXO3lsK6/PERYx+z+Qm
J+fb9uisxQkCgYBNWn4AnXnpEXpZjXl3NmOU7yjJz6e5l7CLc4P5alUKrbo3M4TB
5Pmcllcr+74XDQjMPwPfmkrG3Kn7iZbDTi0AsBzfsAgEnwAmyi0kKnEkzOwAlGC0
tkNn95qNTUux7YfN5/9qvgZaH1hmsam3giLJBQ9BZlRjDqZNlytKy4HjMQKBgQCh
fMNKRSjrBsEl9hFrHtScTYHRUqEkJSRDvBVJdNBDWY2ViaipUqCBrab/xEvxq+sP
lBrYQvB6ppSTWtXQligoQP8Kw2rXa1P/cOhSK7K8l1YQAmed1+/sF3Lj8Qow0Tk2
gi2Xp7jDy1tBWtU1EvbEHrtrDT0hAXa/m7Gl3AoLuQKBgQC0gmTd8mC1v/VctNsh
gJx+sR/6C/vcOuVmOZgCk/Ab95iNVvVGrBsUhIQhjk4euKzofnwFVRX62DxyoK7+
wWbYf+OIfUPCoZvqKcczw1E5grJAT3S9r3wzRNKzlejHqwxTkYTlt6ufmOkb/v/3
ExY9r8ru/aE9xMq79u1T+LRpmg==
-----END PRIVATE KEY-----

View File

@ -14,15 +14,17 @@ class MyAuthorizer(ApplicationSession):
print("MyAuthorizer: failed to register authorizer procedure ({})".format(e))
raise
def authorize(self, details, uri, action, options={}):
print("MyAuthorizer.authorize(uri='{}', action='{}')".format(uri, action))
def authorize(self, details, uri, action, options):
print("MyAuthorizer.authorize(uri='{}', action='{}', options='{}')".format(uri, action, options))
print("options:")
for k, v in options.items():
print(" {}: {}".format(k, v))
if False:
print("I allow everything.")
else:
if options.get(u"match", "") != u"exact":
if uri == u'com.foo.private':
return False
if options.get(u"match", "exact") != u"exact":
print("only exact-match subscriptions allowed")
return False
return True