Add a very basic functional test

It starts the daemon with a simple config file and ensures that
it spins up a node.

A timeout is added to the zmq listener so that its run loop can
be stopped by the 'stopped' flag.  And the shutdown procedure
for nodepool is altered so that it sets those flags and waits
for those threads to join before proceeding.  The previous method
could occasionally cause assertion errors (from C, therefore
core dumps) due to zmq concurrency issues.

Change-Id: I7019a80c9dbf0396c8ddc874a3f4f0c2e977dcfa
This commit is contained in:
James E. Blair 2014-03-25 17:07:15 -07:00
parent 852c6b0b96
commit fca89ee0a0
5 changed files with 119 additions and 4 deletions

View File

@ -77,7 +77,7 @@ class FakeList(object):
self._list.append(s)
t = threading.Thread(target=self._finish,
name='FakeProvider create',
args=(s, 0.5, 'ACTIVE'))
args=(s, 0.1, 'ACTIVE'))
t.start()
return s

View File

@ -137,6 +137,7 @@ class NodeUpdateListener(threading.Thread):
threading.Thread.__init__(self, name='NodeUpdateListener')
self.nodepool = nodepool
self.socket = self.nodepool.zmq_context.socket(zmq.SUB)
self.socket.RCVTIMEO = 1000
event_filter = b""
self.socket.setsockopt(zmq.SUBSCRIBE, event_filter)
self.socket.connect(addr)
@ -144,13 +145,19 @@ class NodeUpdateListener(threading.Thread):
def run(self):
while not self._stopped:
m = self.socket.recv().decode('utf-8')
try:
m = self.socket.recv().decode('utf-8')
except zmq.error.Again:
continue
try:
topic, data = m.split(None, 1)
self.handleEvent(topic, data)
except Exception:
self.log.exception("Exception handling job:")
def stop(self):
self._stopped = True
def handleEvent(self, topic, data):
self.log.debug("Received: %s %s" % (topic, data))
args = json.loads(data)
@ -842,6 +849,10 @@ class NodePool(threading.Thread):
def stop(self):
self._stopped = True
if self.config:
for z in self.config.zmq_publishers.values():
z.listener.stop()
z.listener.join()
if self.zmq_context:
self.zmq_context.destroy()
if self.apsched:

View File

@ -15,6 +15,7 @@
"""Common utilities used in testing"""
import logging
import MySQLdb
import os
import random
@ -31,7 +32,7 @@ class BaseTestCase(testtools.TestCase, testresources.ResourcedTestCase):
def setUp(self):
super(BaseTestCase, self).setUp()
test_timeout = os.environ.get('OS_TEST_TIMEOUT', 30)
test_timeout = os.environ.get('OS_TEST_TIMEOUT', 60)
try:
test_timeout = int(test_timeout)
except ValueError:
@ -49,7 +50,7 @@ class BaseTestCase(testtools.TestCase, testresources.ResourcedTestCase):
stderr = self.useFixture(fixtures.StringStream('stderr')).stream
self.useFixture(fixtures.MonkeyPatch('sys.stderr', stderr))
self.useFixture(fixtures.FakeLogger())
self.useFixture(fixtures.FakeLogger(level=logging.DEBUG))
self.useFixture(fixtures.NestedTempfile())

46
nodepool/tests/fixtures/node.yaml vendored Normal file
View File

@ -0,0 +1,46 @@
script-dir: .
dburi: '{dburi}'
cron:
check: '*/15 * * * *'
cleanup: '*/1 * * * *'
update-image: '14 2 * * *'
zmq-publishers:
- tcp://localhost:8881
gearman-servers:
- host: localhost
labels:
- name: fake-label
image: fake-label
min-ready: 1
providers:
- name: fake-provider
providers:
- name: fake-provider
keypair: 'if-present-use-this-keypair'
username: 'fake'
password: 'fake'
auth-url: 'fake'
project-id: 'fake'
max-servers: 96
pool: 'fake'
networks:
- net-id: 'some-uuid'
rate: 0.0001
images:
- name: fake-label
base-image: 'Fake Precise'
min-ram: 8192
name-filter: 'Fake'
setup: prepare_node_devstack.sh
targets:
- name: fake-target
jenkins:
url: https://jenkins.example.org/
user: fake
apikey: fake

View File

@ -13,12 +13,69 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import os
import tempfile
import threading
import time
from nodepool import tests
from nodepool import nodedb
import nodepool.nodepool
class TestNodepool(tests.DBTestCase):
def setup_config(self, filename):
super(TestNodepool, self).setUp()
configfile = os.path.join(os.path.dirname(tests.__file__),
'fixtures', filename)
config = open(configfile).read()
(fd, path) = tempfile.mkstemp()
os.write(fd, config.format(dburi=self.dburi))
os.close(fd)
return path
def wait_for_threads(self):
whitelist = ['APScheduler',
'MainThread',
'NodePool',
'NodeUpdateListener',
'Gearman client connect',
'Gearman client poll',
'fake-provider',
'fake-jenkins',
'fake-target',
]
while True:
done = True
for t in threading.enumerate():
if t.name not in whitelist:
done = False
if done:
return
time.sleep(0.1)
def test_db(self):
db = nodedb.NodeDatabase(self.dburi)
with db.getSession() as session:
session.getNodes()
def test_node(self):
"""Test that an image and node are created"""
configfile = self.setup_config('node.yaml')
pool = nodepool.nodepool.NodePool(configfile)
pool.start()
time.sleep(2)
while True:
self.wait_for_threads()
with pool.getDB().getSession() as session:
nodes = session.getNodes(provider_name='fake-provider',
label_name='fake-label',
target_name='fake-target',
state=nodedb.READY)
if len(nodes) == 1:
break
nodes = session.getNodes()
time.sleep(1)
self.wait_for_threads()
pool.stop()