From fca89ee0a0797d7e994c4d0c11c7746f7f136636 Mon Sep 17 00:00:00 2001 From: "James E. Blair" Date: Tue, 25 Mar 2014 17:07:15 -0700 Subject: [PATCH] Add a very basic functional test It starts the daemon with a simple config file and ensures that it spins up a node. A timeout is added to the zmq listener so that its run loop can be stopped by the 'stopped' flag. And the shutdown procedure for nodepool is altered so that it sets those flags and waits for those threads to join before proceeding. The previous method could occasionally cause assertion errors (from C, therefore core dumps) due to zmq concurrency issues. Change-Id: I7019a80c9dbf0396c8ddc874a3f4f0c2e977dcfa --- nodepool/fakeprovider.py | 2 +- nodepool/nodepool.py | 13 ++++++- nodepool/tests/__init__.py | 5 +-- nodepool/tests/fixtures/node.yaml | 46 +++++++++++++++++++++++++ nodepool/tests/test_nodepool.py | 57 +++++++++++++++++++++++++++++++ 5 files changed, 119 insertions(+), 4 deletions(-) create mode 100644 nodepool/tests/fixtures/node.yaml diff --git a/nodepool/fakeprovider.py b/nodepool/fakeprovider.py index 94586fec7..0a839c81a 100644 --- a/nodepool/fakeprovider.py +++ b/nodepool/fakeprovider.py @@ -77,7 +77,7 @@ class FakeList(object): self._list.append(s) t = threading.Thread(target=self._finish, name='FakeProvider create', - args=(s, 0.5, 'ACTIVE')) + args=(s, 0.1, 'ACTIVE')) t.start() return s diff --git a/nodepool/nodepool.py b/nodepool/nodepool.py index adcd858cc..211146234 100644 --- a/nodepool/nodepool.py +++ b/nodepool/nodepool.py @@ -137,6 +137,7 @@ class NodeUpdateListener(threading.Thread): threading.Thread.__init__(self, name='NodeUpdateListener') self.nodepool = nodepool self.socket = self.nodepool.zmq_context.socket(zmq.SUB) + self.socket.RCVTIMEO = 1000 event_filter = b"" self.socket.setsockopt(zmq.SUBSCRIBE, event_filter) self.socket.connect(addr) @@ -144,13 +145,19 @@ class NodeUpdateListener(threading.Thread): def run(self): while not self._stopped: - m = self.socket.recv().decode('utf-8') + try: + m = self.socket.recv().decode('utf-8') + except zmq.error.Again: + continue try: topic, data = m.split(None, 1) self.handleEvent(topic, data) except Exception: self.log.exception("Exception handling job:") + def stop(self): + self._stopped = True + def handleEvent(self, topic, data): self.log.debug("Received: %s %s" % (topic, data)) args = json.loads(data) @@ -842,6 +849,10 @@ class NodePool(threading.Thread): def stop(self): self._stopped = True + if self.config: + for z in self.config.zmq_publishers.values(): + z.listener.stop() + z.listener.join() if self.zmq_context: self.zmq_context.destroy() if self.apsched: diff --git a/nodepool/tests/__init__.py b/nodepool/tests/__init__.py index f8664d5fb..60caf40ca 100644 --- a/nodepool/tests/__init__.py +++ b/nodepool/tests/__init__.py @@ -15,6 +15,7 @@ """Common utilities used in testing""" +import logging import MySQLdb import os import random @@ -31,7 +32,7 @@ class BaseTestCase(testtools.TestCase, testresources.ResourcedTestCase): def setUp(self): super(BaseTestCase, self).setUp() - test_timeout = os.environ.get('OS_TEST_TIMEOUT', 30) + test_timeout = os.environ.get('OS_TEST_TIMEOUT', 60) try: test_timeout = int(test_timeout) except ValueError: @@ -49,7 +50,7 @@ class BaseTestCase(testtools.TestCase, testresources.ResourcedTestCase): stderr = self.useFixture(fixtures.StringStream('stderr')).stream self.useFixture(fixtures.MonkeyPatch('sys.stderr', stderr)) - self.useFixture(fixtures.FakeLogger()) + self.useFixture(fixtures.FakeLogger(level=logging.DEBUG)) self.useFixture(fixtures.NestedTempfile()) diff --git a/nodepool/tests/fixtures/node.yaml b/nodepool/tests/fixtures/node.yaml new file mode 100644 index 000000000..821a07dff --- /dev/null +++ b/nodepool/tests/fixtures/node.yaml @@ -0,0 +1,46 @@ +script-dir: . +dburi: '{dburi}' + +cron: + check: '*/15 * * * *' + cleanup: '*/1 * * * *' + update-image: '14 2 * * *' + +zmq-publishers: + - tcp://localhost:8881 + +gearman-servers: + - host: localhost + +labels: + - name: fake-label + image: fake-label + min-ready: 1 + providers: + - name: fake-provider + +providers: + - name: fake-provider + keypair: 'if-present-use-this-keypair' + username: 'fake' + password: 'fake' + auth-url: 'fake' + project-id: 'fake' + max-servers: 96 + pool: 'fake' + networks: + - net-id: 'some-uuid' + rate: 0.0001 + images: + - name: fake-label + base-image: 'Fake Precise' + min-ram: 8192 + name-filter: 'Fake' + setup: prepare_node_devstack.sh + +targets: + - name: fake-target + jenkins: + url: https://jenkins.example.org/ + user: fake + apikey: fake diff --git a/nodepool/tests/test_nodepool.py b/nodepool/tests/test_nodepool.py index 6596a482f..7b9b01455 100644 --- a/nodepool/tests/test_nodepool.py +++ b/nodepool/tests/test_nodepool.py @@ -13,12 +13,69 @@ # See the License for the specific language governing permissions and # limitations under the License. +import os +import tempfile +import threading +import time + from nodepool import tests from nodepool import nodedb +import nodepool.nodepool class TestNodepool(tests.DBTestCase): + def setup_config(self, filename): + super(TestNodepool, self).setUp() + configfile = os.path.join(os.path.dirname(tests.__file__), + 'fixtures', filename) + config = open(configfile).read() + (fd, path) = tempfile.mkstemp() + os.write(fd, config.format(dburi=self.dburi)) + os.close(fd) + return path + + def wait_for_threads(self): + whitelist = ['APScheduler', + 'MainThread', + 'NodePool', + 'NodeUpdateListener', + 'Gearman client connect', + 'Gearman client poll', + 'fake-provider', + 'fake-jenkins', + 'fake-target', + ] + + while True: + done = True + for t in threading.enumerate(): + if t.name not in whitelist: + done = False + if done: + return + time.sleep(0.1) + def test_db(self): db = nodedb.NodeDatabase(self.dburi) with db.getSession() as session: session.getNodes() + + def test_node(self): + """Test that an image and node are created""" + configfile = self.setup_config('node.yaml') + pool = nodepool.nodepool.NodePool(configfile) + pool.start() + time.sleep(2) + while True: + self.wait_for_threads() + with pool.getDB().getSession() as session: + nodes = session.getNodes(provider_name='fake-provider', + label_name='fake-label', + target_name='fake-target', + state=nodedb.READY) + if len(nodes) == 1: + break + nodes = session.getNodes() + time.sleep(1) + self.wait_for_threads() + pool.stop()