update for the client tester

This commit is contained in:
root 2015-10-20 18:26:26 +02:00
parent 01f1663066
commit d1a9d82b5b
2 changed files with 92 additions and 61 deletions

View File

@ -1,66 +1,7 @@
from autobahn.twisted.wamp import ApplicationRunner
from autobahn.twisted.wamp import ApplicationSession
from twisted.internet.defer import inlineCallbacks
from autobahn.twisted.util import sleep
from twisted.internet.defer import inlineCallbacks
import multiprocessing
from Crypto.PublicKey.pubkey import pubkey
import time
msg_queue=None
class ClientWamp:
class Publisher(ApplicationSession):
def __init__(self, config=None):
ApplicationSession.__init__(self, config)
print("component created")
@inlineCallbacks
def onJoin(self, details):
print("Publisher session ready")
while True:
if not msg_queue.empty():
msg=msg_queue.get()
self.publish(u'board.connection', msg)
yield sleep(1)
class Subscriber(ApplicationSession):
@inlineCallbacks
def onJoin(self, details):
print("Subscriber session ready")
def oncounter(count):
print("event received: {0}", count)
try:
yield self.subscribe(oncounter, u'board.connection')
print("subscribed to topic")
except Exception as e:
print("could not subscribe to topic: {0}".format(e))
def __init__(self,ip,port,realm):
self.ip=unicode(ip)
self.port=unicode(port)
self.realm=unicode(realm)
self._url = "ws://"+self.ip+":"+self.port+"/ws"
runner = ApplicationRunner(url=unicode(self._url), realm=self.realm)
global msg_queue
msg_queue = multiprocessing.Queue()
multi = multiprocessing.Process(target=runner.run, args=(self.Subscriber,))
multi.start()
mult2 = multiprocessing.Process(target=runner.run, args=(self.Publisher,))
mult2.start()
def send(self,msg):
msg_queue.put(msg)
from clientwamp import ClientWamp
from sys import stdin
c=ClientWamp('localhost','8181','s4t')
from sys import stdin
c.send('Hello!')
while True:
userinput = stdin.readline()

90
utils/clientwamp.py Normal file
View File

@ -0,0 +1,90 @@
from autobahn.twisted.wamp import ApplicationRunner
from autobahn.twisted.wamp import ApplicationSession
from twisted.internet.defer import inlineCallbacks
import multiprocessing
from autobahn.twisted.util import sleep
msg_queue=None
class Publisher(ApplicationSession):
@inlineCallbacks
def onJoin(self, details):
print("Publisher session ready")
self.publish(u'board.connection', 'counter')
'''
global msg_queue
while True:
if not msg_queue.empty():
msg=msg_queue.get()
self.publish(u'board.connection', msg)
yield sleep(1)
'''
class Subscriber(ApplicationSession):
@inlineCallbacks
def onJoin(self, details):
print("Subscriber session ready")
def oncounter(count):
print("event received: {0}", count)
try:
yield self.subscribe(oncounter, u'board.connection')
print("subscribed to topic")
except Exception as e:
print("could not subscribe to topic: {0}".format(e))
self.publish(u'board.connection', 'counter')
global msg_queue
while True:
if not msg_queue.empty():
msg=msg_queue.get()
self.publish(u'board.connection', msg)
yield sleep(0.01)
class PublisherClient:
def __init__(self,ip,port,realm):
self.ip=unicode(ip)
self.port=unicode(port)
self.realm=unicode(realm)
self._url = "ws://"+self.ip+":"+self.port+"/ws"
self.runner = ApplicationRunner(url=unicode(self._url), realm=self.realm,
#debug=True, debug_wamp=True, debug_app=True
)
def start(self):
# Pass start_reactor=False to all runner.run() calls
self.runner.run(Publisher, start_reactor=False)
class SubscriberClient:
def __init__(self,ip,port,realm):
self.ip=unicode(ip)
self.port=unicode(port)
self.realm=unicode(realm)
self._url = "ws://"+self.ip+":"+self.port+"/ws"
self.runner = ApplicationRunner(url=unicode(self._url), realm=self.realm,
#debug=True, debug_wamp=True, debug_app=True
)
def start(self):
# Pass start_reactor=False to all runner.run() calls
self.runner.run(Subscriber, start_reactor=False)
class ClientWamp:
def __init__(self,ip,port,realm):
server = SubscriberClient(ip,port,realm)
sendMessage = PublisherClient(ip,port,realm)
server.start()
sendMessage.start()
from twisted.internet import reactor
global msg_queue
msg_queue = multiprocessing.Queue()
multi = multiprocessing.Process(target=reactor.run, args=())
multi.start()
def send(self,msg):
msg_queue.put(msg)