Implemented publisher API and multi-process worker

Pulled in runner/API and runner/worker from Endre's fork
https://github.com/ekarlso/cue/tree/runner with a few modifications.

worker-based engines are not being supported with the initial release of
cue, and zookeeper is being used by default as both the persistence and
jobboard backend.

Change-Id: Id05f1a39ebaa91061145f5fc6d069a3c8e154f66
This commit is contained in:
Min Pae 2015-01-23 14:31:34 -08:00
parent fe74174a70
commit df6930e2e0
12 changed files with 659 additions and 96 deletions

1
.gitignore vendored
View File

@ -54,6 +54,7 @@ coverage.xml
# Sphinx documentation
doc/build
doc/source/api
doc/source/autoindex.rst
# PyBuilder
target/

2
Vagrantfile vendored
View File

@ -84,6 +84,6 @@ Vagrant.configure(VAGRANTFILE_API_VERSION) do |config|
# Initialize project and environment
config.vm.provision "shell", inline: "pushd /vagrant && tox ; true"
config.vm.provision "shell", inline: "source /vagrant/.tox/py27/bin/activate ; pushd /vagrant && python setup.py develop"
config.vm.provision "shell", inline: "echo 'source /vagrant/.tox/py27/bin/activate' >> ~root/.profile"
#config.vm.provision "shell", inline: "echo 'source /vagrant/.tox/py27/bin/activate' >> ~root/.profile"
end

View File

@ -1,70 +0,0 @@
# -*- coding: utf-8 -*-
# Copyright 2015 Hewlett-Packard Development Company, L.P.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import sys
import time
import oslo.config.cfg as cfg
from oslo_log import log
import taskflow.jobs.backends as job_backends
import taskflow.persistence.backends as persistence_backends
import cue.common.service as cue_service
PERSISTENCE_BACKEND_CONF = {
"connection": "zookeeper",
}
JOB_BACKEND_CONF = {
"board": "zookeeper",
}
CONF = cfg.CONF
def main():
cue_service.prepare_service(sys.argv)
LOG = log.getLogger(__name__)
with persistence_backends.backend(
PERSISTENCE_BACKEND_CONF.copy()
) as persistence:
with job_backends.backend(
'tutorial_simple',
{"board": "zookeeper",
"path": "/taskflow/jobs/tutorial_simple"
},
persistence=persistence
) as board_simple:
with job_backends.backend(
'tutorial_conduct',
{"board": "zookeeper",
"path": "/taskflow/jobs/tutorial_conduct"
},
persistence=persistence
) as board_conduct:
while True:
job_count = board_simple.job_count
job_count += board_conduct.job_count
LOG.info("%d outstanding jobs" % (job_count))
time.sleep(1)
if __name__ == "__main__":
sys.exit(main())

View File

@ -14,38 +14,52 @@
# under the License.
import logging
import sys
from taskflow.conductors import single_threaded
from taskflow.jobs import backends as job_backends
from taskflow.persistence import backends as persistence_backends
import eventlet
import oslo.config.cfg as cfg
import oslo_log.log as log
PERSISTENCE_BACKEND_CONF = {
#"connection": "mysql+pymysql://taskflow:taskflow@localhost/taskflow",
"connection": "zookeeper",
}
from cue.common.i18n import _LI # noqa
import cue.common.service as cue_service
import cue.openstack.common.service as os_service
import cue.taskflow.service as tf_service
JOB_BACKEND_CONF = {
"board": "zookeeper",
"path": "/taskflow/jobs/tutorial_conduct",
}
eventlet.monkey_patch(os=False)
WORKER_OPTS = [
cfg.IntOpt('count',
help="Number of worker processes to spawn",
default=10)
]
opt_group = cfg.OptGroup(
name='worker',
title='Options for cue worker'
)
cfg.CONF.register_group(opt_group)
cfg.CONF.register_opts(WORKER_OPTS, group=opt_group)
def main():
with persistence_backends.backend(
PERSISTENCE_BACKEND_CONF.copy()
) as persistence:
# Initialize environment
CONF = cfg.CONF
cue_service.prepare_service(sys.argv)
with job_backends.backend(
'tutorial_conduct',
JOB_BACKEND_CONF.copy(),
persistence=persistence
) as board:
# Log configuration and other startup information
LOG = log.getLogger(__name__)
LOG.info(_LI("Starting cue workers"))
LOG.info(_LI("Configuration:"))
CONF.log_opt_values(LOG, logging.INFO)
conductor = single_threaded.SingleThreadedConductor(
"conductor name", board, persistence, engine='serial')
conductor.run()
cue_worker = tf_service.ConductorService.create("cue-worker")
#cue_worker.start()
launcher = os_service.launch(service=cue_worker, workers=CONF.worker.count)
launcher.wait()
if __name__ == "__main__":

View File

@ -0,0 +1,53 @@
# -*- coding: utf-8 -*-
# Copyright 2015 Hewlett-Packard Development Company, L.P.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from oslo.config import cfg
CONF = cfg.CONF
TF_OPTS = [
cfg.StrOpt('persistence_connection',
help="Persistence connection.",
default=None),
cfg.StrOpt('zk_hosts',
help="Zookeeper jobboard hosts.",
default="localhost"),
cfg.StrOpt('zk_path',
help="Zookeeper path for jobs.",
default='/cue/taskflow'),
cfg.IntOpt('zk_timeout',
help="Zookeeper operations timeout.",
default=10),
cfg.StrOpt('jobboard_name',
help="Board name.",
default='cue'),
cfg.StrOpt('engine_type',
help="Engine type.",
default='serial'),
]
opt_group = cfg.OptGroup(
name='taskflow',
title='Options for taskflow.'
)
CONF.register_group(opt_group)
CONF.register_opts(TF_OPTS, group='taskflow')

261
cue/taskflow/client.py Normal file
View File

@ -0,0 +1,261 @@
# -*- coding: utf-8 -*-
# Copyright 2015 Hewlett-Packard Development Company, L.P.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import contextlib
import uuid
from oslo.config import cfg
from oslo.utils import uuidutils
from six.moves import urllib_parse
import taskflow.engines as engines
import taskflow.jobs.backends as job_backends
import taskflow.persistence.backends as persistence_backends
import taskflow.persistence.logbook as logbook
def _make_conf(backend_uri):
"""A helper function for generating persistence backend configuration.
This function takes a backend configuration as a URI of the form
<backend type>://<backend host>/<path>.
:param backend_uri: URI for backend connection
:return: A configuration dictionary for use with
taskflow.persistence.backends
"""
parsed_url = urllib_parse.urlparse(backend_uri)
backend_type = parsed_url.scheme.lower()
if not backend_type:
raise ValueError("Unknown backend type for uri: %s" % (backend_type))
if backend_type in ('file', 'dir'):
conf = {
'path': parsed_url.path,
'connection': backend_uri,
}
elif backend_type in ('zookeeper',):
conf = {
'path': parsed_url.path,
'hosts': parsed_url.netloc,
'connection': backend_uri,
}
else:
conf = {
'connection': backend_uri,
}
return conf
class Client(object):
"""An abstraction for interacting with Taskflow
This class provides an abstraction for Taskflow to expose a simpler
interface for posting jobs to Taskflow Jobboards than what is provided
out of the box with Taskflow.
"""
def __init__(self, client_name, board_name=None, persistence=None,
jobboard=None, **kwargs):
"""Constructor for Client class
:param client_name: Name of the client interacting with the jobboard
:param board_name: Name of the jobboard
:param persistence: A persistence backend instance to be used in lieu
of auto-creating a backend instance based on
configuration parameters
:param jobboard: A jobboard backend instance to be used in lieu of
auto-creating a backend instance based on
configuration parameters
:param kwargs: Any keyword arguments to be passed forward to
persistence and job backend constructors
"""
super(Client, self).__init__()
if jobboard is None and board_name is None:
raise AttributeError("board_name must be supplied "
"if a jobboard is None")
self._client_name = client_name
self._persistence = persistence or Client.persistence(**kwargs)
self._jobboard = jobboard or Client.jobboard(board_name,
None,
self._persistence,
**kwargs)
def __del__(self):
"""Destructor for Client class."""
if self._jobboard is not None:
self._jobboard.close()
if self._persistence is not None:
self._persistence.close()
@classmethod
def create(cls, client_name, board_name=None, persistence=None,
jobboard=None, **kwargs):
"""Factory method for creating a Client instance
:param client_name: Name of the client interacting with the jobboard
:param board_name: Name of the jobboard
:param persistence: A persistence backend instance to be used in lieu
of auto-creating a backend instance based on
configuration parameters
:param jobboard: A jobboard backend instance to be used in lieu of
auto-creating a backend instance based on
configuration parameters
:param kwargs: Any keyword arguments to be passed forward to
persistence and job backend constructors
:return: A :class:`.Client` instance.
"""
return cls(client_name, board_name=board_name, persistence=persistence,
jobboard=jobboard, **kwargs)
@staticmethod
def persistence(conf=None, **kwargs):
"""Factory method for creating a persistence backend instance
:param conf: Configuration parameters for the persistence backend. If
no conf is provided, zookeeper configuration parameters
for the job backend will be used to configure the
persistence backend.
:param kwargs: Keyword arguments to be passed forward to the
persistence backend constructor
:return: A persistence backend instance.
"""
if conf is None:
connection = cfg.CONF.taskflow.persistence_connection
if connection is None:
connection = ("zookeeper://%s/%s"
% (
cfg.CONF.taskflow.zk_hosts,
cfg.CONF.taskflow.zk_path,
))
conf = _make_conf(connection)
be = persistence_backends.fetch(conf=conf, **kwargs)
with contextlib.closing(be.get_connection()) as conn:
conn.upgrade()
return be
@staticmethod
def jobboard(board_name, conf=None, persistence=None, **kwargs):
"""Factory method for creating a jobboard backend instance
:param board_name: Name of the jobboard
:param conf: Configuration parameters for the jobboard backend.
:param persistence: A persistence backend instance to be used with the
jobboard.
:param kwargs: Keyword arguments to be passed forward to the
persistence backend constructor
:return: A persistence backend instance.
"""
if board_name is None:
board_name = cfg.CONF.taskflow.jobboard_name
if conf is None:
conf = {'board': 'zookeeper'}
conf.update({
"path": "%s/jobs" % (cfg.CONF.taskflow.zk_path),
"hosts": cfg.CONF.taskflow.zk_hosts,
"timeout": cfg.CONF.taskflow.zk_timeout
})
jb = job_backends.fetch(
name=board_name,
conf=conf,
persistence=persistence,
**kwargs)
jb.connect()
return jb
def post(self, flow_factory, job_args=None,
flow_args=None, flow_kwargs=None, tx_uuid=None):
"""Method for posting a new job to the jobboard
:param flow_factory: Flow factory function for creating a flow instance
that will be executed as part of the job.
:param job_args: 'store' arguments to be supplied to the engine
executing the flow for the job
:param flow_args: Positional arguments to be passed to the flow factory
function
:param flow_kwargs: Keyword arguments to be passed to the flow factory
function
:param tx_uuid: Transaction UUID which will be injected as 'tx_uuid' in
job_args. A tx_uuid will be generated if one is not
provided as an argument.
:return: A taskflow.job.Job instance that represents the job that was
posted.
"""
if isinstance(job_args, dict) and 'tx_uuid' in job_args:
raise AttributeError("tx_uuid needs to be provided as an argument"
"to Client.post, not as a member of job_args")
if tx_uuid is None:
tx_uuid = uuidutils.generate_uuid()
job_name = "%s[%s]" % (flow_factory.__name__, tx_uuid)
book = logbook.LogBook(job_name)
if flow_factory is not None:
flow_detail = logbook.FlowDetail(job_name, str(uuid.uuid4()))
book.add(flow_detail)
job_details = {'store': job_args or {}}
job_details['store'].update({
'tx_uuid': tx_uuid
})
job_details['flow_uuid'] = flow_detail.uuid
self._persistence.get_connection().save_logbook(book)
engines.save_factory_details(
flow_detail, flow_factory, flow_args, flow_kwargs,
self._persistence)
job = self._jobboard.post(job_name, book, details=job_details)
return job
def joblist(self, only_unclaimed=False, ensure_fresh=False):
"""Method for retrieving a list of jobs in the jobboard
:param only_unclaimed: Return only unclaimed jobs
:param ensure_fresh: Return only the most recent jobs available.
Behavior of this parameter is backend specific.
:return: A list of jobs in the jobboard
"""
return list(self._jobboard.iterjobs(only_unclaimed=only_unclaimed,
ensure_fresh=ensure_fresh))
def delete(self, job=None, job_id=None):
"""Method for deleting a job from the jobboard.
Due to constraints in the available taskflow interfaces, deleting by
job_id entails retrieving and iterating over the list of all jobs in
the jobboard. Thus deleting by job rather than job_id can be faster.
:param job: A Taskflow.job.Job representing the job to be deleted
:param job_id: Unique job_id referencing the job to be deleted
:return:
"""
if (job is None) == (job_id is None):
raise AttributeError("exactly one of either job or job_id must "
"be supplied")
if job is None:
for j in self.joblist():
if j.uuid == job_id:
job = j
self._jobboard.claim(job, self._client_name)
self._jobboard.consume(job, self._client_name)

163
cue/taskflow/service.py Normal file
View File

@ -0,0 +1,163 @@
# -*- coding: utf-8 -*-
# Copyright 2015 Hewlett-Packard Development Company, L.P.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import contextlib
import logging as std_logging
import time
import eventlet.event as event
from oslo.config import cfg
from oslo_log import log as logging
from taskflow.conductors import single_threaded
import cue.openstack.common.service as os_service
import cue.taskflow.client as tf_client
import cue.version as version
LOG = logging.getLogger(__name__)
CONF = cfg.CONF
SUPPORTED_ENGINE_TYPES = ['serial', 'parallel']
class ConductorService(os_service.Service):
"""A Service wrapper for executing taskflow jobs in a conductor.
This class provides an oslo.service.Service wrapper for executing taskflow
jobs in a conductor.
This wrapper is compatible with both single process and multi-process
launchers.
"""
def __init__(self, host=None, jobboard_name=None, jobboard_conf=None,
persistence_conf=None, engine_conf=None, wait_timeout=None,
*args, **kwargs):
"""Constructor for ConductorService
:param host: Name to be used to identify the host running the conductor
:param jobboard_name: Name of the jobboard
:param jobboard_conf: Configuration parameters for the jobboard
backend. This configuration is passed forward to
:meth:`cue.taskflow.client.Client.jobboard`.
:param persistence_conf: Configuration parameters for the persistence
backend. This configuration is passed forward
to
:meth:`cue.taskflow.client.Client.persistence`
:param engine_conf: A dictionary containing onfiguration parameters for
the engine used by the conductor. The 'engine'
parameter specifies the engine type. Currently
only 'serial' and 'parallel' engine types are
supported.
:param wait_timeout: The number of seconds to wait for a new job to
appear when waiting for jobs.
:param args: Positional arguments to be passed to the jobboard and
persistence backend constructors
:param kwargs: Keyword arguemtns to be passed to the jobboard and
persistence backend constructors
"""
super(ConductorService, self).__init__(*args, **kwargs)
if (engine_conf['engine'] not in SUPPORTED_ENGINE_TYPES):
raise ValueError("%s is not a supported engine type"
% engine_conf['engine'])
self._host = host
self._jobboard_name = jobboard_name
self._jobboard_conf = jobboard_conf
self._persistence_conf = persistence_conf
self._engine_conf = engine_conf
self._wait_timeout = wait_timeout
self._shutdown_event = event.Event()
self._args = args
self._kwargs = kwargs
@classmethod
def create(cls, host=None, jobboard_name=None, jobboard_conf=None,
persistence_conf=None, engine_conf=None, wait_timeout=1,
*args, **kwargs):
"""Factory method for creating a ConductorService instance
:param host: Name to be used to identify the host running the conductor
:param jobboard_name: Name of the jobboard
:param jobboard_conf: Configuration parameters for the jobboard
backend. This configuration is passed forward to
:meth:`cue.taskflow.client.Client.jobboard`.
:param persistence_conf: Configuration parameters for the persistence
backend. This configuration is passed forward
to
:meth:`cue.taskflow.client.Client.persistence`
:param engine_conf: A dictionary containing onfiguration parameters for
the engine used by the conductor. The 'engine'
parameter specifies the engine type. Currently
only 'serial' and 'parallel' engine types are
supported.
:param wait_timeout: The number of seconds to wait for a new job to
appear when waiting for jobs.
:param args: Positional arguments to be passed to the jobboard and
persistence backend constructors
:param kwargs: Keyword arguments to be passed to the jobboard and
persistence backend constructors
:return: A :class:`.ConductorService` instance.
"""
engine_conf = engine_conf or {}
engine_conf.setdefault('engine', CONF.taskflow.engine_type)
return cls(host, jobboard_name, jobboard_conf, persistence_conf,
engine_conf, wait_timeout, *args, **kwargs)
def start(self):
"""Interface to start the ConductorService."""
super(ConductorService, self).start()
CONF.log_opt_values(LOG, std_logging.INFO)
version_string = version.version_info.version_string()
LOG.debug("Starting runner %s on board %s",
version_string, self._jobboard_name)
with contextlib.closing(
tf_client.Client.persistence(conf=self._persistence_conf)
) as persistence:
with contextlib.closing(
tf_client.Client.jobboard(
board_name=self._jobboard_name,
conf=self._jobboard_conf,
persistence=persistence,
)
) as jobboard:
self._conductor = single_threaded.SingleThreadedConductor(
name=self._host,
jobboard=jobboard,
persistence=persistence,
engine=self._engine_conf['engine'],
wait_timeout=self._wait_timeout)
time.sleep(0.5)
self._conductor.run()
self._shutdown_event.send()
def stop(self):
"""Interface to stop the ConductorService."""
self._shutdown = True
self._conductor.stop()
super(ConductorService, self).stop()
def wait(self):
"""Interface to wait for ConductorService to complete."""
self._shutdown_event.wait()
super(ConductorService, self).wait()

View File

View File

@ -0,0 +1,85 @@
# -*- coding: utf-8 -*-
# Copyright 2015 Hewlett-Packard Development Company, L.P.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from oslo.utils import uuidutils
import taskflow.patterns.linear_flow as linear_flow
import taskflow.task
import zake.fake_client as fake_client
import cue.taskflow.client as tf_client
import cue.tests.base as base
class TimesTwo(taskflow.task.Task):
def execute(self, test_arg):
return (test_arg * 2)
def create_flow():
return linear_flow.Flow('test flow').add(
TimesTwo(),
)
class TaskflowClientTest(base.TestCase):
def setUp(self):
super(TaskflowClientTest, self).setUp()
self._zk_client = fake_client.FakeClient()
self.persistence = tf_client.Client.persistence(client=self._zk_client)
self.jobboard = tf_client.Client.jobboard("test_board",
persistence=self.persistence,
client=self._zk_client)
self.tf_client = tf_client.Client("test_client",
persistence=self.persistence,
jobboard=self.jobboard)
def tearDown(self):
super(TaskflowClientTest, self).tearDown()
def test_post_job(self):
job_args = {
'test_arg': 5
}
tx_uuid = uuidutils.generate_uuid()
pre_count = self.jobboard.job_count
job = self.tf_client.post(create_flow, job_args, tx_uuid=tx_uuid)
post_count = self.jobboard.job_count
expected = pre_count + 1
self.assertEqual(expected, post_count,
"expected %d jobs in the jobboard after a post, "
"got %d" % (expected, post_count))
job_list = self.tf_client.joblist()
self.assertEqual(expected, len(job_list),
"expected %d jobs in the joblist, "
"got %d" % (expected, post_count))
posted_job = {}
for j in job_list:
if j.uuid == job.uuid:
posted_job = j
self.assertDictEqual(posted_job.__dict__, job.__dict__,
"Job in jobboard differs from job returned by "
"Client.post method")
pre_count = self.jobboard.job_count
self.tf_client.delete(job=job)
post_count = self.jobboard.job_count
expected = pre_count - 1
self.assertEqual(expected, post_count,
"expected %d jobs in the jobboard after a claim, "
"got %d" % (expected, post_count))

View File

@ -0,0 +1,57 @@
[taskflow]
#
# Options for taskflow based workflow engine
#
#
# Persistence connection, used to persist workflow state information.
# If no connection string is supplied, zookeeper will be used by default
# with the zookeeper configuration provided in the zk_* configurations.
# Default:
#
#persistence_connection=zookeeper://127.0.0.1/cue/taskflow
#
# Zookeeper host list. A single address can be provided for a standalone
# Zookeeper instance, or a comma separated list may be provided to
# connect to an ensemble of Zookeeper hosts.
# Default: localhost
#
#zk_hosts=127.0.0.1
#
# Zookeeper znode path that will be used as the root for all taskflow related
# information being persisted in Zookeeper.
# Default: /cue/taskflow
#
#zk_path=/cue/taskflow
#
# Timeout (in seconds) for zookeeper operations
# Default: 10
#
#zk_timeout=10
#
# Jobboard name
# Default: cue
#
#jobboard_name=cue
#
# Taskflow Engine type used by the worker to run jobs. Only serial and parallel
# are supported
# Default: serial
#
#engine_type=serial
[worker]
#
# Number of worker processes to spawn.
# Default: 10
#
#count=10

View File

@ -28,7 +28,6 @@ console_scripts =
cue-api = cue.cmd.api:main
cue-dbsync = cue.cmd.dbsync:main
cue-manage = cue.cmd.manage:main
cue-monitor = cue.cmd.monitor:main
cue-worker = cue.cmd.worker:main
cue.database.migration_backend =

View File

@ -16,4 +16,4 @@ testtools>=0.9.36,!=1.2.0
sphinx>=1.1.2,!=1.2.0,!=1.3b1,<1.3
sphinxcontrib-pecanwsme>=0.8
oslosphinx>=2.2.0 # Apache-2.0
zake>=0.1.6