From df6930e2e03aed3f6f9b479d4773627ff15b3f87 Mon Sep 17 00:00:00 2001 From: Min Pae Date: Fri, 23 Jan 2015 14:31:34 -0800 Subject: [PATCH] Implemented publisher API and multi-process worker Pulled in runner/API and runner/worker from Endre's fork https://github.com/ekarlso/cue/tree/runner with a few modifications. worker-based engines are not being supported with the initial release of cue, and zookeeper is being used by default as both the persistence and jobboard backend. Change-Id: Id05f1a39ebaa91061145f5fc6d069a3c8e154f66 --- .gitignore | 1 + Vagrantfile | 2 +- cue/cmd/monitor.py | 70 -------- cue/cmd/worker.py | 60 ++++--- cue/taskflow/__init__.py | 53 ++++++ cue/taskflow/client.py | 261 ++++++++++++++++++++++++++++++ cue/taskflow/service.py | 163 +++++++++++++++++++ cue/tests/taskflow/__init__.py | 0 cue/tests/taskflow/test_client.py | 85 ++++++++++ etc/cue/worker.conf.sample | 57 +++++++ setup.cfg | 1 - test-requirements.txt | 2 +- 12 files changed, 659 insertions(+), 96 deletions(-) delete mode 100644 cue/cmd/monitor.py create mode 100644 cue/taskflow/client.py create mode 100644 cue/taskflow/service.py create mode 100644 cue/tests/taskflow/__init__.py create mode 100644 cue/tests/taskflow/test_client.py create mode 100644 etc/cue/worker.conf.sample diff --git a/.gitignore b/.gitignore index 8854bd14..af2848ac 100644 --- a/.gitignore +++ b/.gitignore @@ -54,6 +54,7 @@ coverage.xml # Sphinx documentation doc/build doc/source/api +doc/source/autoindex.rst # PyBuilder target/ diff --git a/Vagrantfile b/Vagrantfile index b2b4eec3..db4ec3fb 100644 --- a/Vagrantfile +++ b/Vagrantfile @@ -84,6 +84,6 @@ Vagrant.configure(VAGRANTFILE_API_VERSION) do |config| # Initialize project and environment config.vm.provision "shell", inline: "pushd /vagrant && tox ; true" config.vm.provision "shell", inline: "source /vagrant/.tox/py27/bin/activate ; pushd /vagrant && python setup.py develop" - config.vm.provision "shell", inline: "echo 'source /vagrant/.tox/py27/bin/activate' >> ~root/.profile" + #config.vm.provision "shell", inline: "echo 'source /vagrant/.tox/py27/bin/activate' >> ~root/.profile" end diff --git a/cue/cmd/monitor.py b/cue/cmd/monitor.py deleted file mode 100644 index ef917591..00000000 --- a/cue/cmd/monitor.py +++ /dev/null @@ -1,70 +0,0 @@ -# -*- coding: utf-8 -*- -# Copyright 2015 Hewlett-Packard Development Company, L.P. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -import sys -import time - -import oslo.config.cfg as cfg -from oslo_log import log -import taskflow.jobs.backends as job_backends -import taskflow.persistence.backends as persistence_backends - -import cue.common.service as cue_service - -PERSISTENCE_BACKEND_CONF = { - "connection": "zookeeper", -} - -JOB_BACKEND_CONF = { - "board": "zookeeper", -} - -CONF = cfg.CONF - - -def main(): - cue_service.prepare_service(sys.argv) - - LOG = log.getLogger(__name__) - - with persistence_backends.backend( - PERSISTENCE_BACKEND_CONF.copy() - ) as persistence: - - with job_backends.backend( - 'tutorial_simple', - {"board": "zookeeper", - "path": "/taskflow/jobs/tutorial_simple" - }, - persistence=persistence - ) as board_simple: - - with job_backends.backend( - 'tutorial_conduct', - {"board": "zookeeper", - "path": "/taskflow/jobs/tutorial_conduct" - }, - persistence=persistence - ) as board_conduct: - - while True: - job_count = board_simple.job_count - job_count += board_conduct.job_count - LOG.info("%d outstanding jobs" % (job_count)) - time.sleep(1) - - -if __name__ == "__main__": - sys.exit(main()) diff --git a/cue/cmd/worker.py b/cue/cmd/worker.py index f37c9f76..c0672295 100644 --- a/cue/cmd/worker.py +++ b/cue/cmd/worker.py @@ -14,38 +14,52 @@ # under the License. +import logging import sys -from taskflow.conductors import single_threaded -from taskflow.jobs import backends as job_backends -from taskflow.persistence import backends as persistence_backends +import eventlet +import oslo.config.cfg as cfg +import oslo_log.log as log -PERSISTENCE_BACKEND_CONF = { - #"connection": "mysql+pymysql://taskflow:taskflow@localhost/taskflow", - "connection": "zookeeper", -} +from cue.common.i18n import _LI # noqa +import cue.common.service as cue_service +import cue.openstack.common.service as os_service +import cue.taskflow.service as tf_service -JOB_BACKEND_CONF = { - "board": "zookeeper", - "path": "/taskflow/jobs/tutorial_conduct", -} + +eventlet.monkey_patch(os=False) + + +WORKER_OPTS = [ + cfg.IntOpt('count', + help="Number of worker processes to spawn", + default=10) +] + +opt_group = cfg.OptGroup( + name='worker', + title='Options for cue worker' +) + +cfg.CONF.register_group(opt_group) +cfg.CONF.register_opts(WORKER_OPTS, group=opt_group) def main(): - with persistence_backends.backend( - PERSISTENCE_BACKEND_CONF.copy() - ) as persistence: + # Initialize environment + CONF = cfg.CONF + cue_service.prepare_service(sys.argv) - with job_backends.backend( - 'tutorial_conduct', - JOB_BACKEND_CONF.copy(), - persistence=persistence - ) as board: + # Log configuration and other startup information + LOG = log.getLogger(__name__) + LOG.info(_LI("Starting cue workers")) + LOG.info(_LI("Configuration:")) + CONF.log_opt_values(LOG, logging.INFO) - conductor = single_threaded.SingleThreadedConductor( - "conductor name", board, persistence, engine='serial') - - conductor.run() + cue_worker = tf_service.ConductorService.create("cue-worker") + #cue_worker.start() + launcher = os_service.launch(service=cue_worker, workers=CONF.worker.count) + launcher.wait() if __name__ == "__main__": diff --git a/cue/taskflow/__init__.py b/cue/taskflow/__init__.py index e69de29b..f12c3537 100644 --- a/cue/taskflow/__init__.py +++ b/cue/taskflow/__init__.py @@ -0,0 +1,53 @@ +# -*- coding: utf-8 -*- +# Copyright 2015 Hewlett-Packard Development Company, L.P. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +from oslo.config import cfg + + +CONF = cfg.CONF + + +TF_OPTS = [ + cfg.StrOpt('persistence_connection', + help="Persistence connection.", + default=None), + + cfg.StrOpt('zk_hosts', + help="Zookeeper jobboard hosts.", + default="localhost"), + + cfg.StrOpt('zk_path', + help="Zookeeper path for jobs.", + default='/cue/taskflow'), + + cfg.IntOpt('zk_timeout', + help="Zookeeper operations timeout.", + default=10), + + cfg.StrOpt('jobboard_name', + help="Board name.", + default='cue'), + + cfg.StrOpt('engine_type', + help="Engine type.", + default='serial'), +] + +opt_group = cfg.OptGroup( + name='taskflow', + title='Options for taskflow.' +) + +CONF.register_group(opt_group) +CONF.register_opts(TF_OPTS, group='taskflow') diff --git a/cue/taskflow/client.py b/cue/taskflow/client.py new file mode 100644 index 00000000..e72d360f --- /dev/null +++ b/cue/taskflow/client.py @@ -0,0 +1,261 @@ +# -*- coding: utf-8 -*- +# Copyright 2015 Hewlett-Packard Development Company, L.P. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +import contextlib +import uuid + +from oslo.config import cfg +from oslo.utils import uuidutils +from six.moves import urllib_parse +import taskflow.engines as engines +import taskflow.jobs.backends as job_backends +import taskflow.persistence.backends as persistence_backends +import taskflow.persistence.logbook as logbook + + +def _make_conf(backend_uri): + """A helper function for generating persistence backend configuration. + + This function takes a backend configuration as a URI of the form + :///. + + :param backend_uri: URI for backend connection + :return: A configuration dictionary for use with + taskflow.persistence.backends + """ + parsed_url = urllib_parse.urlparse(backend_uri) + backend_type = parsed_url.scheme.lower() + if not backend_type: + raise ValueError("Unknown backend type for uri: %s" % (backend_type)) + if backend_type in ('file', 'dir'): + conf = { + 'path': parsed_url.path, + 'connection': backend_uri, + } + elif backend_type in ('zookeeper',): + conf = { + 'path': parsed_url.path, + 'hosts': parsed_url.netloc, + 'connection': backend_uri, + } + else: + conf = { + 'connection': backend_uri, + } + return conf + + +class Client(object): + """An abstraction for interacting with Taskflow + + This class provides an abstraction for Taskflow to expose a simpler + interface for posting jobs to Taskflow Jobboards than what is provided + out of the box with Taskflow. + """ + + def __init__(self, client_name, board_name=None, persistence=None, + jobboard=None, **kwargs): + """Constructor for Client class + + :param client_name: Name of the client interacting with the jobboard + :param board_name: Name of the jobboard + :param persistence: A persistence backend instance to be used in lieu + of auto-creating a backend instance based on + configuration parameters + :param jobboard: A jobboard backend instance to be used in lieu of + auto-creating a backend instance based on + configuration parameters + :param kwargs: Any keyword arguments to be passed forward to + persistence and job backend constructors + """ + super(Client, self).__init__() + + if jobboard is None and board_name is None: + raise AttributeError("board_name must be supplied " + "if a jobboard is None") + + self._client_name = client_name + + self._persistence = persistence or Client.persistence(**kwargs) + + self._jobboard = jobboard or Client.jobboard(board_name, + None, + self._persistence, + **kwargs) + + def __del__(self): + """Destructor for Client class.""" + if self._jobboard is not None: + self._jobboard.close() + if self._persistence is not None: + self._persistence.close() + + @classmethod + def create(cls, client_name, board_name=None, persistence=None, + jobboard=None, **kwargs): + """Factory method for creating a Client instance + + :param client_name: Name of the client interacting with the jobboard + :param board_name: Name of the jobboard + :param persistence: A persistence backend instance to be used in lieu + of auto-creating a backend instance based on + configuration parameters + :param jobboard: A jobboard backend instance to be used in lieu of + auto-creating a backend instance based on + configuration parameters + :param kwargs: Any keyword arguments to be passed forward to + persistence and job backend constructors + :return: A :class:`.Client` instance. + """ + return cls(client_name, board_name=board_name, persistence=persistence, + jobboard=jobboard, **kwargs) + + @staticmethod + def persistence(conf=None, **kwargs): + """Factory method for creating a persistence backend instance + + :param conf: Configuration parameters for the persistence backend. If + no conf is provided, zookeeper configuration parameters + for the job backend will be used to configure the + persistence backend. + :param kwargs: Keyword arguments to be passed forward to the + persistence backend constructor + :return: A persistence backend instance. + """ + if conf is None: + connection = cfg.CONF.taskflow.persistence_connection + if connection is None: + connection = ("zookeeper://%s/%s" + % ( + cfg.CONF.taskflow.zk_hosts, + cfg.CONF.taskflow.zk_path, + )) + conf = _make_conf(connection) + be = persistence_backends.fetch(conf=conf, **kwargs) + with contextlib.closing(be.get_connection()) as conn: + conn.upgrade() + return be + + @staticmethod + def jobboard(board_name, conf=None, persistence=None, **kwargs): + """Factory method for creating a jobboard backend instance + + :param board_name: Name of the jobboard + :param conf: Configuration parameters for the jobboard backend. + :param persistence: A persistence backend instance to be used with the + jobboard. + :param kwargs: Keyword arguments to be passed forward to the + persistence backend constructor + :return: A persistence backend instance. + """ + if board_name is None: + board_name = cfg.CONF.taskflow.jobboard_name + + if conf is None: + conf = {'board': 'zookeeper'} + + conf.update({ + "path": "%s/jobs" % (cfg.CONF.taskflow.zk_path), + "hosts": cfg.CONF.taskflow.zk_hosts, + "timeout": cfg.CONF.taskflow.zk_timeout + }) + + jb = job_backends.fetch( + name=board_name, + conf=conf, + persistence=persistence, + **kwargs) + jb.connect() + return jb + + def post(self, flow_factory, job_args=None, + flow_args=None, flow_kwargs=None, tx_uuid=None): + """Method for posting a new job to the jobboard + + :param flow_factory: Flow factory function for creating a flow instance + that will be executed as part of the job. + :param job_args: 'store' arguments to be supplied to the engine + executing the flow for the job + :param flow_args: Positional arguments to be passed to the flow factory + function + :param flow_kwargs: Keyword arguments to be passed to the flow factory + function + :param tx_uuid: Transaction UUID which will be injected as 'tx_uuid' in + job_args. A tx_uuid will be generated if one is not + provided as an argument. + :return: A taskflow.job.Job instance that represents the job that was + posted. + """ + if isinstance(job_args, dict) and 'tx_uuid' in job_args: + raise AttributeError("tx_uuid needs to be provided as an argument" + "to Client.post, not as a member of job_args") + + if tx_uuid is None: + tx_uuid = uuidutils.generate_uuid() + + job_name = "%s[%s]" % (flow_factory.__name__, tx_uuid) + book = logbook.LogBook(job_name) + + if flow_factory is not None: + flow_detail = logbook.FlowDetail(job_name, str(uuid.uuid4())) + book.add(flow_detail) + + job_details = {'store': job_args or {}} + job_details['store'].update({ + 'tx_uuid': tx_uuid + }) + job_details['flow_uuid'] = flow_detail.uuid + + self._persistence.get_connection().save_logbook(book) + + engines.save_factory_details( + flow_detail, flow_factory, flow_args, flow_kwargs, + self._persistence) + + job = self._jobboard.post(job_name, book, details=job_details) + return job + + def joblist(self, only_unclaimed=False, ensure_fresh=False): + """Method for retrieving a list of jobs in the jobboard + + :param only_unclaimed: Return only unclaimed jobs + :param ensure_fresh: Return only the most recent jobs available. + Behavior of this parameter is backend specific. + :return: A list of jobs in the jobboard + """ + return list(self._jobboard.iterjobs(only_unclaimed=only_unclaimed, + ensure_fresh=ensure_fresh)) + + def delete(self, job=None, job_id=None): + """Method for deleting a job from the jobboard. + + Due to constraints in the available taskflow interfaces, deleting by + job_id entails retrieving and iterating over the list of all jobs in + the jobboard. Thus deleting by job rather than job_id can be faster. + + :param job: A Taskflow.job.Job representing the job to be deleted + :param job_id: Unique job_id referencing the job to be deleted + :return: + """ + if (job is None) == (job_id is None): + raise AttributeError("exactly one of either job or job_id must " + "be supplied") + + if job is None: + for j in self.joblist(): + if j.uuid == job_id: + job = j + + self._jobboard.claim(job, self._client_name) + self._jobboard.consume(job, self._client_name) diff --git a/cue/taskflow/service.py b/cue/taskflow/service.py new file mode 100644 index 00000000..e57ec46e --- /dev/null +++ b/cue/taskflow/service.py @@ -0,0 +1,163 @@ +# -*- coding: utf-8 -*- +# Copyright 2015 Hewlett-Packard Development Company, L.P. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +import contextlib +import logging as std_logging +import time + +import eventlet.event as event +from oslo.config import cfg +from oslo_log import log as logging +from taskflow.conductors import single_threaded + +import cue.openstack.common.service as os_service +import cue.taskflow.client as tf_client +import cue.version as version + + +LOG = logging.getLogger(__name__) + +CONF = cfg.CONF + +SUPPORTED_ENGINE_TYPES = ['serial', 'parallel'] + + +class ConductorService(os_service.Service): + """A Service wrapper for executing taskflow jobs in a conductor. + + This class provides an oslo.service.Service wrapper for executing taskflow + jobs in a conductor. + + This wrapper is compatible with both single process and multi-process + launchers. + """ + def __init__(self, host=None, jobboard_name=None, jobboard_conf=None, + persistence_conf=None, engine_conf=None, wait_timeout=None, + *args, **kwargs): + """Constructor for ConductorService + + :param host: Name to be used to identify the host running the conductor + :param jobboard_name: Name of the jobboard + :param jobboard_conf: Configuration parameters for the jobboard + backend. This configuration is passed forward to + :meth:`cue.taskflow.client.Client.jobboard`. + :param persistence_conf: Configuration parameters for the persistence + backend. This configuration is passed forward + to + :meth:`cue.taskflow.client.Client.persistence` + :param engine_conf: A dictionary containing onfiguration parameters for + the engine used by the conductor. The 'engine' + parameter specifies the engine type. Currently + only 'serial' and 'parallel' engine types are + supported. + :param wait_timeout: The number of seconds to wait for a new job to + appear when waiting for jobs. + :param args: Positional arguments to be passed to the jobboard and + persistence backend constructors + :param kwargs: Keyword arguemtns to be passed to the jobboard and + persistence backend constructors + """ + super(ConductorService, self).__init__(*args, **kwargs) + + if (engine_conf['engine'] not in SUPPORTED_ENGINE_TYPES): + raise ValueError("%s is not a supported engine type" + % engine_conf['engine']) + + self._host = host + + self._jobboard_name = jobboard_name + self._jobboard_conf = jobboard_conf + self._persistence_conf = persistence_conf + self._engine_conf = engine_conf + self._wait_timeout = wait_timeout + self._shutdown_event = event.Event() + self._args = args + self._kwargs = kwargs + + @classmethod + def create(cls, host=None, jobboard_name=None, jobboard_conf=None, + persistence_conf=None, engine_conf=None, wait_timeout=1, + *args, **kwargs): + """Factory method for creating a ConductorService instance + + :param host: Name to be used to identify the host running the conductor + :param jobboard_name: Name of the jobboard + :param jobboard_conf: Configuration parameters for the jobboard + backend. This configuration is passed forward to + :meth:`cue.taskflow.client.Client.jobboard`. + :param persistence_conf: Configuration parameters for the persistence + backend. This configuration is passed forward + to + :meth:`cue.taskflow.client.Client.persistence` + :param engine_conf: A dictionary containing onfiguration parameters for + the engine used by the conductor. The 'engine' + parameter specifies the engine type. Currently + only 'serial' and 'parallel' engine types are + supported. + :param wait_timeout: The number of seconds to wait for a new job to + appear when waiting for jobs. + :param args: Positional arguments to be passed to the jobboard and + persistence backend constructors + :param kwargs: Keyword arguments to be passed to the jobboard and + persistence backend constructors + :return: A :class:`.ConductorService` instance. + """ + engine_conf = engine_conf or {} + engine_conf.setdefault('engine', CONF.taskflow.engine_type) + + return cls(host, jobboard_name, jobboard_conf, persistence_conf, + engine_conf, wait_timeout, *args, **kwargs) + + def start(self): + """Interface to start the ConductorService.""" + super(ConductorService, self).start() + + CONF.log_opt_values(LOG, std_logging.INFO) + + version_string = version.version_info.version_string() + LOG.debug("Starting runner %s on board %s", + version_string, self._jobboard_name) + + with contextlib.closing( + tf_client.Client.persistence(conf=self._persistence_conf) + ) as persistence: + with contextlib.closing( + tf_client.Client.jobboard( + board_name=self._jobboard_name, + conf=self._jobboard_conf, + persistence=persistence, + ) + ) as jobboard: + self._conductor = single_threaded.SingleThreadedConductor( + name=self._host, + jobboard=jobboard, + persistence=persistence, + engine=self._engine_conf['engine'], + wait_timeout=self._wait_timeout) + + time.sleep(0.5) + self._conductor.run() + + self._shutdown_event.send() + + def stop(self): + """Interface to stop the ConductorService.""" + self._shutdown = True + self._conductor.stop() + super(ConductorService, self).stop() + + def wait(self): + """Interface to wait for ConductorService to complete.""" + self._shutdown_event.wait() + super(ConductorService, self).wait() diff --git a/cue/tests/taskflow/__init__.py b/cue/tests/taskflow/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/cue/tests/taskflow/test_client.py b/cue/tests/taskflow/test_client.py new file mode 100644 index 00000000..f45a3a1d --- /dev/null +++ b/cue/tests/taskflow/test_client.py @@ -0,0 +1,85 @@ +# -*- coding: utf-8 -*- +# Copyright 2015 Hewlett-Packard Development Company, L.P. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +from oslo.utils import uuidutils +import taskflow.patterns.linear_flow as linear_flow +import taskflow.task +import zake.fake_client as fake_client + +import cue.taskflow.client as tf_client +import cue.tests.base as base + + +class TimesTwo(taskflow.task.Task): + def execute(self, test_arg): + return (test_arg * 2) + + +def create_flow(): + return linear_flow.Flow('test flow').add( + TimesTwo(), + ) + + +class TaskflowClientTest(base.TestCase): + def setUp(self): + super(TaskflowClientTest, self).setUp() + self._zk_client = fake_client.FakeClient() + self.persistence = tf_client.Client.persistence(client=self._zk_client) + self.jobboard = tf_client.Client.jobboard("test_board", + persistence=self.persistence, + client=self._zk_client) + self.tf_client = tf_client.Client("test_client", + persistence=self.persistence, + jobboard=self.jobboard) + + def tearDown(self): + super(TaskflowClientTest, self).tearDown() + + def test_post_job(self): + job_args = { + 'test_arg': 5 + } + tx_uuid = uuidutils.generate_uuid() + + pre_count = self.jobboard.job_count + job = self.tf_client.post(create_flow, job_args, tx_uuid=tx_uuid) + post_count = self.jobboard.job_count + expected = pre_count + 1 + + self.assertEqual(expected, post_count, + "expected %d jobs in the jobboard after a post, " + "got %d" % (expected, post_count)) + + job_list = self.tf_client.joblist() + self.assertEqual(expected, len(job_list), + "expected %d jobs in the joblist, " + "got %d" % (expected, post_count)) + posted_job = {} + for j in job_list: + if j.uuid == job.uuid: + posted_job = j + + self.assertDictEqual(posted_job.__dict__, job.__dict__, + "Job in jobboard differs from job returned by " + "Client.post method") + + pre_count = self.jobboard.job_count + self.tf_client.delete(job=job) + post_count = self.jobboard.job_count + expected = pre_count - 1 + + self.assertEqual(expected, post_count, + "expected %d jobs in the jobboard after a claim, " + "got %d" % (expected, post_count)) diff --git a/etc/cue/worker.conf.sample b/etc/cue/worker.conf.sample new file mode 100644 index 00000000..4baaf442 --- /dev/null +++ b/etc/cue/worker.conf.sample @@ -0,0 +1,57 @@ +[taskflow] + +# +# Options for taskflow based workflow engine +# + +# +# Persistence connection, used to persist workflow state information. +# If no connection string is supplied, zookeeper will be used by default +# with the zookeeper configuration provided in the zk_* configurations. +# Default: +# +#persistence_connection=zookeeper://127.0.0.1/cue/taskflow + +# +# Zookeeper host list. A single address can be provided for a standalone +# Zookeeper instance, or a comma separated list may be provided to +# connect to an ensemble of Zookeeper hosts. +# Default: localhost +# +#zk_hosts=127.0.0.1 + +# +# Zookeeper znode path that will be used as the root for all taskflow related +# information being persisted in Zookeeper. +# Default: /cue/taskflow +# +#zk_path=/cue/taskflow + +# +# Timeout (in seconds) for zookeeper operations +# Default: 10 +# +#zk_timeout=10 + +# +# Jobboard name +# Default: cue +# +#jobboard_name=cue + +# +# Taskflow Engine type used by the worker to run jobs. Only serial and parallel +# are supported +# Default: serial +# +#engine_type=serial + + + +[worker] + +# +# Number of worker processes to spawn. +# Default: 10 +# +#count=10 diff --git a/setup.cfg b/setup.cfg index 869ba384..4d6dcbcc 100644 --- a/setup.cfg +++ b/setup.cfg @@ -28,7 +28,6 @@ console_scripts = cue-api = cue.cmd.api:main cue-dbsync = cue.cmd.dbsync:main cue-manage = cue.cmd.manage:main - cue-monitor = cue.cmd.monitor:main cue-worker = cue.cmd.worker:main cue.database.migration_backend = diff --git a/test-requirements.txt b/test-requirements.txt index 9610be0f..34af9984 100644 --- a/test-requirements.txt +++ b/test-requirements.txt @@ -16,4 +16,4 @@ testtools>=0.9.36,!=1.2.0 sphinx>=1.1.2,!=1.2.0,!=1.3b1,<1.3 sphinxcontrib-pecanwsme>=0.8 oslosphinx>=2.2.0 # Apache-2.0 - +zake>=0.1.6