Merge remote-tracking branch 'upstream/master' into rel_0_18_3

This commit is contained in:
Tobias Oberstein 2017-05-01 11:37:52 +02:00
commit f1eb092a5e
12 changed files with 386 additions and 68 deletions

View File

@ -316,3 +316,49 @@ class WebSocketOriginMatching(unittest.TestCase):
self.assertFalse(
_is_same_origin(_url_to_origin('null'), None, 80, [])
)
class WebSocketXForwardedFor(unittest.TestCase):
"""
Test that (only) a trusted X-Forwarded-For can replace the peer address.
"""
def setUp(self):
self.factory = WebSocketServerFactory()
self.factory.setProtocolOptions(
trustXForwardedFor=2
)
self.proto = WebSocketServerProtocol()
self.proto.transport = StringTransport()
self.proto.factory = self.factory
self.proto.failHandshake = Mock()
self.proto._connectionMade()
def tearDown(self):
for call in [
self.proto.autoPingPendingCall,
self.proto.autoPingTimeoutCall,
self.proto.openHandshakeTimeoutCall,
self.proto.closeHandshakeTimeoutCall,
]:
if call is not None:
call.cancel()
def test_trusted_addresses(self):
self.proto.data = b"\r\n".join([
b'GET /ws HTTP/1.1',
b'Host: www.example.com',
b'Origin: http://www.example.com',
b'Sec-WebSocket-Version: 13',
b'Sec-WebSocket-Extensions: permessage-deflate',
b'Sec-WebSocket-Key: tXAxWFUqnhi86Ajj7dRY5g==',
b'Connection: keep-alive, Upgrade',
b'Upgrade: websocket',
b'X-Forwarded-For: 1.2.3.4, 2.3.4.5, 111.222.33.44',
b'\r\n', # last string doesn't get a \r\n from join()
])
self.proto.consumeData()
self.assertEquals(
self.proto.peer, "2.3.4.5",
"The second address in X-Forwarded-For should have been picked as the peer address")

View File

@ -51,7 +51,7 @@ class IObjectSerializer(object):
@public
@abc.abstractproperty
def BINARY():
def BINARY(self):
"""
Flag (read-only) to indicate if serializer requires a binary clean
transport or if UTF8 transparency is sufficient.
@ -59,7 +59,7 @@ class IObjectSerializer(object):
@public
@abc.abstractmethod
def serialize(obj):
def serialize(self, obj):
"""
Serialize an object to a byte string.
@ -72,7 +72,7 @@ class IObjectSerializer(object):
@public
@abc.abstractmethod
def unserialize(payload):
def unserialize(self, payload):
"""
Unserialize objects from a byte string.
@ -93,21 +93,21 @@ class ISerializer(object):
@public
@abc.abstractproperty
def MESSAGE_TYPE_MAP():
def MESSAGE_TYPE_MAP(self):
"""
Mapping of WAMP message type codes to WAMP message classes.
"""
@public
@abc.abstractproperty
def SERIALIZER_ID():
def SERIALIZER_ID(self):
"""
The WAMP serialization format ID.
"""
@public
@abc.abstractmethod
def serialize(message):
def serialize(self, message):
"""
Serializes a WAMP message to bytes for sending over a transport.
@ -120,7 +120,7 @@ class ISerializer(object):
@public
@abc.abstractmethod
def unserialize(payload, isBinary):
def unserialize(self, payload, isBinary):
"""
Deserialize bytes from a transport and parse into WAMP messages.
@ -144,7 +144,7 @@ class IMessage(object):
@public
@abc.abstractproperty
def MESSAGE_TYPE():
def MESSAGE_TYPE(self):
"""
WAMP message type code.
"""
@ -166,7 +166,7 @@ class IMessage(object):
@public
@abc.abstractmethod
def serialize(serializer):
def serialize(self, serializer):
"""
Serialize this object into a wire level bytes representation and cache
the resulting bytes. If the cache already contains an entry for the given
@ -181,7 +181,7 @@ class IMessage(object):
@public
@abc.abstractmethod
def uncache():
def uncache(self):
"""
Resets the serialization cache for this message.
"""
@ -197,7 +197,7 @@ class ITransport(object):
@public
@abc.abstractmethod
def send(message):
def send(self, message):
"""
Send a WAMP message over the transport to the peer. If the transport is
not open, this raises :class:`autobahn.wamp.exception.TransportLost`.
@ -213,7 +213,7 @@ class ITransport(object):
@public
@abc.abstractmethod
def isOpen():
def isOpen(self):
"""
Check if the transport is open for messaging.
@ -223,7 +223,7 @@ class ITransport(object):
@public
@abc.abstractmethod
def close():
def close(self):
"""
Close the transport regularly. The transport will perform any
closing handshake if applicable. This should be used for any
@ -232,7 +232,7 @@ class ITransport(object):
@public
@abc.abstractmethod
def abort():
def abort(self):
"""
Abort the transport abruptly. The transport will be destroyed as
fast as possible, and without playing nice to the peer. This should
@ -242,7 +242,7 @@ class ITransport(object):
@public
@abc.abstractmethod
def get_channel_id():
def get_channel_id(self):
"""
Return the unique channel ID of the underlying transport. This is used to
mitigate credential forwarding man-in-the-middle attacks when running
@ -292,7 +292,7 @@ class ITransportHandler(object):
@public
@abc.abstractproperty
def transport():
def transport(self):
"""
When the transport this handler is attached to is currently open, this property
can be read from. The property should be considered read-only. When the transport
@ -301,7 +301,7 @@ class ITransportHandler(object):
@public
@abc.abstractmethod
def onOpen(transport):
def onOpen(self, transport):
"""
Callback fired when transport is open. May run asynchronously. The transport
is considered running and is_open() would return true, as soon as this callback
@ -313,7 +313,7 @@ class ITransportHandler(object):
@public
@abc.abstractmethod
def onMessage(message):
def onMessage(self, message):
"""
Callback fired when a WAMP message was received. May run asynchronously. The callback
should return or fire the returned deferred/future when it's done processing the message.
@ -325,7 +325,7 @@ class ITransportHandler(object):
@public
@abc.abstractmethod
def onClose(wasClean):
def onClose(self, wasClean):
"""
Callback fired when the transport has been closed.
@ -342,7 +342,7 @@ class ISession(object):
"""
@abc.abstractmethod
def __init__(config=None):
def __init__(self, config=None):
"""
:param config: Configuration for session.
@ -351,7 +351,7 @@ class ISession(object):
@public
@abc.abstractmethod
def onUserError(fail, msg):
def onUserError(self, fail, msg):
"""
This is called when we try to fire a callback, but get an
exception from user code -- for example, a registered publish
@ -372,14 +372,15 @@ class ISession(object):
@public
@abc.abstractmethod
def onConnect():
def onConnect(self):
"""
Callback fired when the transport this session will run over has been established.
"""
@public
@abc.abstractmethod
def join(realm,
def join(self,
realm,
authmethods=None,
authid=None,
authrole=None,
@ -393,7 +394,7 @@ class ISession(object):
@public
@abc.abstractmethod
def onChallenge(challenge):
def onChallenge(self, challenge):
"""
Callback fired when the peer demands authentication.
@ -405,7 +406,7 @@ class ISession(object):
@public
@abc.abstractmethod
def onJoin(details):
def onJoin(self, details):
"""
Callback fired when WAMP session has been established.
@ -417,7 +418,7 @@ class ISession(object):
@public
@abc.abstractmethod
def leave(reason=None, message=None):
def leave(self, reason=None, message=None):
"""
Actively close this WAMP session.
@ -434,7 +435,7 @@ class ISession(object):
@public
@abc.abstractmethod
def onLeave(details):
def onLeave(self, details):
"""
Callback fired when WAMP session has is closed
@ -444,35 +445,35 @@ class ISession(object):
@public
@abc.abstractmethod
def disconnect():
def disconnect(self):
"""
Close the underlying transport.
"""
@public
@abc.abstractmethod
def onDisconnect():
def onDisconnect(self):
"""
Callback fired when underlying transport has been closed.
"""
@public
@abc.abstractmethod
def is_connected():
def is_connected(self):
"""
Check if the underlying transport is connected.
"""
@public
@abc.abstractmethod
def is_attached():
def is_attached(self):
"""
Check if the session has currently joined a realm.
"""
@public
@abc.abstractmethod
def set_payload_codec(payload_codec):
def set_payload_codec(self, payload_codec):
"""
Set a payload codec on the session. To remove a previously set payload codec,
set the codec to ``None``.
@ -487,7 +488,7 @@ class ISession(object):
@public
@abc.abstractmethod
def get_payload_codec():
def get_payload_codec(self):
"""
Get the current payload codec (if any) for the session.
@ -500,7 +501,7 @@ class ISession(object):
@public
@abc.abstractmethod
def define(exception, error=None):
def define(self, exception, error=None):
"""
Defines an exception for a WAMP error in the context of this WAMP session.
@ -514,7 +515,7 @@ class ISession(object):
@public
@abc.abstractmethod
def call(procedure, *args, **kwargs):
def call(self, procedure, *args, **kwargs):
"""
Call a remote procedure.
@ -551,7 +552,7 @@ class ISession(object):
@public
@abc.abstractmethod
def register(endpoint, procedure=None, options=None):
def register(self, endpoint, procedure=None, options=None):
"""
Register a procedure for remote calling.
@ -587,7 +588,7 @@ class ISession(object):
@public
@abc.abstractmethod
def publish(topic, *args, **kwargs):
def publish(self, topic, *args, **kwargs):
"""
Publish an event to a topic.
@ -624,7 +625,7 @@ class ISession(object):
@public
@abc.abstractmethod
def subscribe(handler, topic=None, options=None):
def subscribe(self, handler, topic=None, options=None):
"""
Subscribe to a topic for receiving events.
@ -664,7 +665,7 @@ class ISession(object):
class IAuthenticator(object):
@abc.abstractmethod
def on_challenge(session, challenge):
def on_challenge(self, session, challenge):
"""
"""
@ -690,7 +691,7 @@ class IPayloadCodec(object):
@public
@abc.abstractmethod
def encode(is_originating, uri, args=None, kwargs=None):
def encode(self, is_originating, uri, args=None, kwargs=None):
"""
Encodes application payload.
@ -715,7 +716,7 @@ class IPayloadCodec(object):
@public
@abc.abstractmethod
def decode(is_originating, uri, encoded_payload):
def decode(self, is_originating, uri, encoded_payload):
"""
Decode application payload.

View File

@ -1313,9 +1313,14 @@ class ApplicationSession(BaseSession):
if "_wampuris" in proc.__dict__:
for pat in proc.__dict__["_wampuris"]:
if pat.is_handler():
uri = pat.uri()
subopts = options or pat.subscribe_options()
on_replies.append(_subscribe(handler, proc, uri, subopts))
_uri = pat.uri()
subopts = pat.options or options
if subopts is None:
if pat.uri_type == uri.Pattern.URI_TYPE_WILDCARD:
subopts = types.SubscribeOptions(match=u"wildcard")
else:
subopts = types.SubscribeOptions(match=u"exact")
on_replies.append(_subscribe(handler, proc, _uri, subopts))
# XXX needs coverage
return txaio.gather(on_replies, consume_exceptions=True)
@ -1473,8 +1478,9 @@ class ApplicationSession(BaseSession):
if "_wampuris" in proc.__dict__:
for pat in proc.__dict__["_wampuris"]:
if pat.is_endpoint():
uri = pat.uri()
on_replies.append(_register(endpoint, proc, uri, options))
_uri = pat.uri()
regopts = pat.options or options
on_replies.append(_register(endpoint, proc, _uri, regopts))
# XXX neds coverage
return txaio.gather(on_replies, consume_exceptions=True)

View File

@ -27,7 +27,7 @@
from __future__ import absolute_import
from autobahn import wamp
from autobahn.wamp.uri import Pattern
from autobahn.wamp.uri import Pattern, RegisterOptions, SubscribeOptions
import unittest2 as unittest
@ -141,6 +141,24 @@ class TestDecorators(unittest.TestCase):
self.assertEqual(update._wampuris[0].uri(), u"com.myapp.<category:string>.<cid:int>.update")
self.assertEqual(update._wampuris[0]._type, Pattern.URI_TYPE_WILDCARD)
@wamp.register(u"com.myapp.circle.<name:string>",
RegisterOptions(match=u"wildcard", details_arg="details"))
def circle(name=None, details=None):
""" Do nothing. """
self.assertTrue(hasattr(circle, '_wampuris'))
self.assertTrue(type(circle._wampuris) == list)
self.assertEqual(len(circle._wampuris), 1)
self.assertIsInstance(circle._wampuris[0], Pattern)
self.assertIsInstance(circle._wampuris[0].options, RegisterOptions)
self.assertEqual(circle._wampuris[0].options.match, u"wildcard")
self.assertEqual(circle._wampuris[0].options.details_arg, "details")
self.assertTrue(circle._wampuris[0].is_endpoint())
self.assertFalse(circle._wampuris[0].is_handler())
self.assertFalse(circle._wampuris[0].is_exception())
self.assertEqual(circle._wampuris[0].uri(), u"com.myapp.circle.<name:string>")
self.assertEqual(circle._wampuris[0]._type, Pattern.URI_TYPE_WILDCARD)
def test_decorate_handler(self):
@wamp.subscribe(u"com.myapp.on_shutdown")
@ -185,6 +203,24 @@ class TestDecorators(unittest.TestCase):
self.assertEqual(on_update._wampuris[0].uri(), u"com.myapp.<category:string>.<cid:int>.on_update")
self.assertEqual(on_update._wampuris[0]._type, Pattern.URI_TYPE_WILDCARD)
@wamp.subscribe(u"com.myapp.on.<event:string>",
SubscribeOptions(match=u"wildcard", details_arg="details"))
def on_event(event=None, details=None):
""" Do nothing. """
self.assertTrue(hasattr(on_event, '_wampuris'))
self.assertTrue(type(on_event._wampuris) == list)
self.assertEqual(len(on_event._wampuris), 1)
self.assertIsInstance(on_event._wampuris[0], Pattern)
self.assertIsInstance(on_event._wampuris[0].options, SubscribeOptions)
self.assertEqual(on_event._wampuris[0].options.match, u"wildcard")
self.assertEqual(on_event._wampuris[0].options.details_arg, "details")
self.assertFalse(on_event._wampuris[0].is_endpoint())
self.assertTrue(on_event._wampuris[0].is_handler())
self.assertFalse(on_event._wampuris[0].is_exception())
self.assertEqual(on_event._wampuris[0].uri(), u"com.myapp.on.<event:string>")
self.assertEqual(on_event._wampuris[0]._type, Pattern.URI_TYPE_WILDCARD)
def test_decorate_exception(self):
@wamp.error(u"com.myapp.error")

View File

@ -28,10 +28,11 @@
from __future__ import absolute_import
import re
import six
from autobahn.util import public
from autobahn.wamp.types import SubscribeOptions
from autobahn.wamp.types import RegisterOptions, SubscribeOptions
__all__ = (
'Pattern',
@ -132,7 +133,7 @@ class Pattern(object):
This pattern is stricter than a general WAMP URI component since a valid Python identifier is required.
"""
def __init__(self, uri, target):
def __init__(self, uri, target, options=None):
"""
:param uri: The URI or URI pattern, e.g. ``"com.myapp.product.<product:int>.update"``.
@ -141,11 +142,20 @@ class Pattern(object):
:param target: The target for this pattern: a procedure endpoint (a callable),
an event handler (a callable) or an exception (a class).
:type target: callable or obj
:param options: An optional options object
:type options: None or RegisterOptions or SubscribeOptions
"""
assert(type(uri) == six.text_type)
assert(target in [Pattern.URI_TARGET_ENDPOINT,
Pattern.URI_TARGET_HANDLER,
Pattern.URI_TARGET_EXCEPTION])
if target == Pattern.URI_TARGET_ENDPOINT:
assert(options is None or type(options) == RegisterOptions)
elif target == Pattern.URI_TARGET_HANDLER:
assert(options is None or type(options) == SubscribeOptions)
else:
options = None
components = uri.split('.')
pl = []
@ -207,6 +217,29 @@ class Pattern(object):
self._names = None
self._uri = uri
self._target = target
self._options = options
@public
@property
def options(self):
"""
Returns the Options instance (if present) for this pattern.
:return: None or the Options instance
:rtype: None or RegisterOptions or SubscribeOptions
"""
return self._options
@public
@property
def uri_type(self):
"""
Returns the URI type of this pattern
:return:
:rtype: Pattern.URI_TYPE_EXACT, Pattern.URI_TYPE_PREFIX or Pattern.URI_TYPE_WILDCARD
"""
return self._type
@public
def uri(self):
@ -218,12 +251,6 @@ class Pattern(object):
"""
return self._uri
def subscribe_options(self):
if self._type == Pattern.URI_TYPE_WILDCARD:
return SubscribeOptions(match=u"wildcard")
else:
return SubscribeOptions(match=u"exact")
def match(self, uri):
"""
Match the given (fully qualified) URI according to this pattern
@ -282,29 +309,41 @@ class Pattern(object):
@public
def register(uri):
def register(uri, options=None):
"""
Decorator for WAMP procedure endpoints.
:param uri:
:type uri: str
:param options:
:type options: None or RegisterOptions
"""
def decorate(f):
assert(callable(f))
if not hasattr(f, '_wampuris'):
f._wampuris = []
f._wampuris.append(Pattern(uri, Pattern.URI_TARGET_ENDPOINT))
f._wampuris.append(Pattern(uri, Pattern.URI_TARGET_ENDPOINT, options))
return f
return decorate
@public
def subscribe(uri):
def subscribe(uri, options=None):
"""
Decorator for WAMP event handlers.
:param uri:
:type uri: str
:param options:
:type options: None or SubscribeOptions
"""
def decorate(f):
assert(callable(f))
if not hasattr(f, '_wampuris'):
f._wampuris = []
f._wampuris.append(Pattern(uri, Pattern.URI_TARGET_HANDLER))
f._wampuris.append(Pattern(uri, Pattern.URI_TARGET_HANDLER, options))
return f
return decorate

View File

@ -46,7 +46,8 @@ class IWebSocketServerChannelFactory(object):
"""
@abc.abstractmethod
def __init__(url=None,
def __init__(self,
url=None,
protocols=None,
server=None,
headers=None,
@ -73,7 +74,8 @@ class IWebSocketServerChannelFactory(object):
@public
@abc.abstractmethod
def setSessionParameters(url=None,
def setSessionParameters(self,
url=None,
protocols=None,
server=None,
headers=None,
@ -101,7 +103,8 @@ class IWebSocketServerChannelFactory(object):
@public
@abc.abstractmethod
def setProtocolOptions(versions=None,
def setProtocolOptions(self,
versions=None,
webStatus=None,
utf8validateIncoming=None,
maskServerFrames=None,
@ -123,7 +126,8 @@ class IWebSocketServerChannelFactory(object):
flashSocketPolicy=None,
allowedOrigins=None,
allowNullOrigin=False,
maxConnections=None):
maxConnections=None,
trustXForwardedFor=0):
"""
Set WebSocket protocol options used as defaults for new protocol instances.
@ -198,6 +202,9 @@ class IWebSocketServerChannelFactory(object):
:param maxConnections: Maximum number of concurrent connections. Set to `0` to disable (default: `0`).
:type maxConnections: int or None
:param trustXForwardedFor: Number of trusted web servers in front of this server that add their own X-Forwarded-For header (default: `0`)
:type trustXForwardedFor: int
"""
@public
@ -218,7 +225,8 @@ class IWebSocketClientChannelFactory(object):
"""
@abc.abstractmethod
def __init__(url=None,
def __init__(self,
url=None,
origin=None,
protocols=None,
useragent=None,
@ -253,7 +261,8 @@ class IWebSocketClientChannelFactory(object):
@public
@abc.abstractmethod
def setSessionParameters(url=None,
def setSessionParameters(self,
url=None,
origin=None,
protocols=None,
useragent=None,
@ -285,7 +294,8 @@ class IWebSocketClientChannelFactory(object):
@public
@abc.abstractmethod
def setProtocolOptions(version=None,
def setProtocolOptions(self,
version=None,
utf8validateIncoming=None,
acceptMaskedServerFrames=None,
maskClientFrames=None,

View File

@ -528,7 +528,8 @@ class WebSocketProtocol(object):
'allowedOrigins',
'allowedOriginsPatterns',
'allowNullOrigin',
'maxConnections']
'maxConnections',
'trustXForwardedFor']
"""
Configuration attributes specific to servers.
"""
@ -2491,6 +2492,13 @@ class WebSocketServerProtocol(WebSocketProtocol):
except Exception as e:
return self.failHandshake("Error during parsing of HTTP status line / request headers : {0}".format(e))
# replace self.peer if the x-forwarded-for header is present and trusted
#
if 'x-forwarded-for' in self.http_headers and self.trustXForwardedFor:
addresses = [x.strip() for x in self.http_headers['x-forwarded-for'].split(',')]
trusted_addresses = addresses[-self.trustXForwardedFor:]
self.peer = trusted_addresses[0]
# validate WebSocket opening handshake client request
#
self.log.debug(
@ -3179,6 +3187,9 @@ class WebSocketServerFactory(WebSocketFactory):
# maximum number of concurrent connections
self.maxConnections = 0
# number of trusted web servers in front of this server
self.trustXForwardedFor = 0
def setProtocolOptions(self,
versions=None,
webStatus=None,
@ -3202,7 +3213,8 @@ class WebSocketServerFactory(WebSocketFactory):
flashSocketPolicy=None,
allowedOrigins=None,
allowNullOrigin=False,
maxConnections=None):
maxConnections=None,
trustXForwardedFor=None):
"""
Implements :func:`autobahn.websocket.interfaces.IWebSocketServerChannelFactory.setProtocolOptions`
"""
@ -3285,6 +3297,11 @@ class WebSocketServerFactory(WebSocketFactory):
assert(maxConnections >= 0)
self.maxConnections = maxConnections
if trustXForwardedFor is not None and trustXForwardedFor != self.trustXForwardedFor:
assert(type(trustXForwardedFor) in six.integer_types)
assert(trustXForwardedFor >= 0)
self.trustXForwardedFor = trustXForwardedFor
def getConnectionCount(self):
"""
Get number of currently connected clients.

View File

@ -529,6 +529,7 @@ Server-Only Options
- flashSocketPolicy: the actual flash policy to serve (default one allows everything)
- allowedOrigins: a list of origins to allow, with embedded `*`'s for wildcards; these are turned into regular expressions (e.g. `https://*.example.com:443` becomes `^https://.*\.example\.com:443$`). When doing the matching, the origin is **always** of the form `scheme://host:port` with an explicit port. By default, we match with `*` (that is, anything). To match all subdomains of `example.com` on any scheme and port, you'd need `*://*.example.com:*`
- maxConnections: total concurrent connections allowed (default 0, unlimited)
- trustXForwardedFor: number of trusted web servers (reverse proxies) in front of this server which set the X-Forwarded-For header
Client-Only Options

View File

@ -6,6 +6,15 @@
"realms": [
{
"name": "crossbardemo",
"store": {
"type": "memory",
"event-history": [
{
"uri": "com.example.history",
"limit": 100
}
]
},
"roles": [
{
"name": "anonymous",

View File

@ -0,0 +1,23 @@
This shows how to use "retained" events as well as "event history".
The "backend.py" does some publishing to two different topics:
"com.example.history" and "com.example.no_history_here". The former
has event-history enabled (see the example router's config in the
"crossbardemo" realm).
When "frontend.py" attaches and subscribes to *either* of the two
topics, it will immediately receive the latest event.
After that, the WAMP Meta API is used to retrieve the event-history
from the "com.example.history" topic.
Things to try:
- run backend.py, then run frontend.py in a new shell
- run backend.py, kill it, then run frontend.py
- with frontend.py running, kill and re-run backend.py several times
- ...then try starting frontend.py again (which will show more events
in the history

View File

@ -0,0 +1,59 @@
###############################################################################
#
# The MIT License (MIT)
#
# Copyright (c) Crossbar.io Technologies GmbH
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
# THE SOFTWARE.
#
###############################################################################
from os import environ
from twisted.internet import reactor
from twisted.internet.defer import inlineCallbacks
from autobahn.twisted.wamp import Session, ApplicationRunner
from autobahn.twisted.util import sleep
from autobahn.wamp.types import PublishOptions
class Component(Session):
"""
An application component calling the different backend procedures.
"""
@inlineCallbacks
def onJoin(self, details):
print("session attached {}".format(details))
for topic in [u"com.example.history", u"com.example.no_history_here"]:
print("publishing '{}' as retained event".format(topic))
pub = yield self.publish(
topic, "some data, topic was '{}'".format(topic),
options=PublishOptions(retain=True, acknowledge=True),
)
print("published: {}".format(pub))
if __name__ == '__main__':
runner = ApplicationRunner(
environ.get("AUTOBAHN_DEMO_ROUTER", u"ws://127.0.0.1:8080/auth_ws"),
u"crossbardemo",
)
runner.run(Component)

View File

@ -0,0 +1,71 @@
###############################################################################
#
# The MIT License (MIT)
#
# Copyright (c) Crossbar.io Technologies GmbH
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
# THE SOFTWARE.
#
###############################################################################
from os import environ
from twisted.internet import reactor
from twisted.internet.defer import inlineCallbacks
from autobahn.twisted.wamp import Session, ApplicationRunner
from autobahn.twisted.util import sleep
from autobahn.wamp.types import SubscribeOptions
class Component(Session):
"""
An application component calling the different backend procedures.
"""
@inlineCallbacks
def onJoin(self, details):
print("session attached {}".format(details))
def got_event(*args, **kw):
print("got_event(): args={}, kwargs={}".format(args, kw))
# note: we're relying on 'com.example.history' (the one with
# event-history enabled) being last so that "pub" has the
# right ID for wamp.subscription.get_events after the loop
for topic in [u"com.example.no_history_here", u"com.example.history"]:
print("subscribing to '{}'".format(topic))
pub = yield self.subscribe(
got_event, topic,
options=SubscribeOptions(get_retained=True),
)
print("id={}".format(pub.id))
events = yield self.call(u"wamp.subscription.get_events", pub.id)
print("Using the WAMP Meta API:")
print("wamp.subscription.get_events {}: {}".format(pub.id, len(events)))
for event in events:
print(" {event[timestamp]} {event[topic]} args={event[args]} kwargs={event[kwargs]}".format(event=event))
if __name__ == '__main__':
runner = ApplicationRunner(
environ.get("AUTOBAHN_DEMO_ROUTER", u"ws://127.0.0.1:8080/auth_ws"),
u"crossbardemo",
)
runner.run(Component)