Merge "Provide TextJob and TextWorker for convenience"

This commit is contained in:
Jenkins 2017-05-17 16:58:31 +00:00 committed by Gerrit Code Review
commit 50616da084
2 changed files with 392 additions and 108 deletions

View File

@ -1426,8 +1426,8 @@ class Client(BaseClient):
if job.unique is None:
unique = b''
else:
unique = job.unique
data = b'\x00'.join((job.binary_name, unique, job.arguments))
unique = job.binary_unique
data = b'\x00'.join((job.binary_name, unique, job.binary_arguments))
if background:
if precedence == PRECEDENCE_NORMAL:
cmd = constants.SUBMIT_JOB_BG
@ -1696,6 +1696,159 @@ class FunctionRecord(object):
id(self), self.name, self.timeout)
class BaseJob(object):
def __init__(self, name, arguments, unique=None, handle=None):
self._name = convert_to_bytes(name)
self._validate_arguments(arguments)
self._arguments = convert_to_bytes(arguments)
self._unique = convert_to_bytes(unique)
self.handle = handle
self.connection = None
def _validate_arguments(self, arguments):
if (not isinstance(arguments, bytes) and
not isinstance(arguments, bytearray)):
raise TypeError("arguments must be of type bytes or bytearray")
@property
def arguments(self):
return self._arguments
@arguments.setter
def arguments(self, value):
self._arguments = value
@property
def unique(self):
return self._unique
@unique.setter
def unique(self, value):
self._unique = value
@property
def name(self):
if isinstance(self._name, six.binary_type):
return self._name.decode('utf-8')
return self._name
@name.setter
def name(self, value):
if isinstance(value, six.text_type):
value = value.encode('utf-8')
self._name = value
@property
def binary_name(self):
return self._name
@property
def binary_arguments(self):
return self._arguments
@property
def binary_unique(self):
return self._unique
def __repr__(self):
return '<gear.Job 0x%x handle: %s name: %s unique: %s>' % (
id(self), self.handle, self.name, self.unique)
class WorkerJob(BaseJob):
"""A job that Gearman has assigned to a Worker. Not intended to
be instantiated directly, but rather returned by
:py:meth:`Worker.getJob`.
:arg str handle: The job handle assigned by gearman.
:arg str name: The name of the job.
:arg bytes arguments: The opaque data blob passed to the worker
as arguments.
:arg str unique: A byte string to uniquely identify the job to Gearman
(optional).
The following instance attributes are available:
**name** (str)
The name of the job. Assumed to be utf-8.
**arguments** (bytes)
The opaque data blob passed to the worker as arguments.
**unique** (str or None)
The unique ID of the job (if supplied).
**handle** (bytes)
The Gearman job handle.
**connection** (:py:class:`Connection` or None)
The connection associated with the job. Only set after the job
has been submitted to a Gearman server.
"""
def __init__(self, handle, name, arguments, unique=None):
super(WorkerJob, self).__init__(name, arguments, unique, handle)
def sendWorkData(self, data=b''):
"""Send a WORK_DATA packet to the client.
:arg bytes data: The data to be sent to the client (optional).
"""
data = self.handle + b'\x00' + data
p = Packet(constants.REQ, constants.WORK_DATA, data)
self.connection.sendPacket(p)
def sendWorkWarning(self, data=b''):
"""Send a WORK_WARNING packet to the client.
:arg bytes data: The data to be sent to the client (optional).
"""
data = self.handle + b'\x00' + data
p = Packet(constants.REQ, constants.WORK_WARNING, data)
self.connection.sendPacket(p)
def sendWorkStatus(self, numerator, denominator):
"""Send a WORK_STATUS packet to the client.
Sends a numerator and denominator that together represent the
fraction complete of the job.
:arg numeric numerator: The numerator of the fraction complete.
:arg numeric denominator: The denominator of the fraction complete.
"""
data = (self.handle + b'\x00' +
str(numerator).encode('utf8') + b'\x00' +
str(denominator).encode('utf8'))
p = Packet(constants.REQ, constants.WORK_STATUS, data)
self.connection.sendPacket(p)
def sendWorkComplete(self, data=b''):
"""Send a WORK_COMPLETE packet to the client.
:arg bytes data: The data to be sent to the client (optional).
"""
data = self.handle + b'\x00' + data
p = Packet(constants.REQ, constants.WORK_COMPLETE, data)
self.connection.sendPacket(p)
def sendWorkFail(self):
"Send a WORK_FAIL packet to the client."
p = Packet(constants.REQ, constants.WORK_FAIL, self.handle)
self.connection.sendPacket(p)
def sendWorkException(self, data=b''):
"""Send a WORK_EXCEPTION packet to the client.
:arg bytes data: The exception data to be sent to the client
(optional).
"""
data = self.handle + b'\x00' + data
p = Packet(constants.REQ, constants.WORK_EXCEPTION, data)
self.connection.sendPacket(p)
class Worker(BaseClient):
"""A Gearman worker.
@ -1708,6 +1861,8 @@ class Worker(BaseClient):
is deprecated, use client_id instead.
"""
job_class = WorkerJob
def __init__(self, client_id=None, worker_id=None):
if not client_id or worker_id:
raise Exception("A client_id must be provided")
@ -2027,7 +2182,7 @@ class Worker(BaseClient):
arguments, unique)
def _handleJobAssignment(self, packet, handle, name, arguments, unique):
job = WorkerJob(handle, name, arguments, unique)
job = self.job_class(handle, name, arguments, unique)
job.connection = packet.connection
self.job_lock.acquire()
@ -2043,38 +2198,6 @@ class Worker(BaseClient):
self.job_lock.release()
class BaseJob(object):
def __init__(self, name, arguments, unique=None, handle=None):
self._name = convert_to_bytes(name)
if (not isinstance(arguments, bytes) and
not isinstance(arguments, bytearray)):
raise TypeError("arguments must be of type bytes or bytearray")
self.arguments = arguments
self.unique = convert_to_bytes(unique)
self.handle = handle
self.connection = None
@property
def name(self):
if isinstance(self._name, six.binary_type):
return self._name.decode('utf-8')
return self._name
@name.setter
def name(self, value):
if isinstance(value, six.text_type):
value = value.encode('utf-8')
self._name = value
@property
def binary_name(self):
return self._name
def __repr__(self):
return '<gear.Job 0x%x handle: %s name: %s unique: %s>' % (
id(self), self.handle, self.name, self.unique)
class Job(BaseJob):
"""A job to run or being run by Gearman.
@ -2097,7 +2220,9 @@ class Job(BaseJob):
**data** (list of byte-arrays)
The result data returned from Gearman. Each packet appends an
element to the list. Depending on the nature of the data, the
elements may need to be concatenated before use.
elements may need to be concatenated before use. This is returned
as a snapshot copy of the data to prevent accidental attempts at
modification which will be lost.
**exception** (bytes or None)
Exception information returned from Gearman. None if no exception
has been received.
@ -2127,10 +2252,12 @@ class Job(BaseJob):
has been submitted to a Gearman server.
"""
data_type = list
def __init__(self, name, arguments, unique=None):
super(Job, self).__init__(name, arguments, unique)
self.data = []
self.exception = None
self._data = self.data_type()
self._exception = None
self.warning = False
self.complete = False
self.failure = False
@ -2140,99 +2267,181 @@ class Job(BaseJob):
self.known = None
self.running = None
@property
def binary_data(self):
for value in self._data:
if isinstance(value, six.text_type):
value = value.encode('utf-8')
yield value
class WorkerJob(BaseJob):
"""A job that Gearman has assigned to a Worker. Not intended to
be instantiated directly, but rather returned by
:py:meth:`Worker.getJob`.
@property
def data(self):
return self._data
:arg str handle: The job handle assigned by gearman.
:arg str name: The name of the job.
:arg bytes arguments: The opaque data blob passed to the worker
as arguments.
:arg str unique: A byte string to uniquely identify the job to Gearman
(optional).
@data.setter
def data(self, value):
if not isinstance(value, self.data_type):
raise ValueError(
"data attribute must be {}".format(self.data_type))
self._data = value
The following instance attributes are available:
@property
def exception(self):
return self._exception
@exception.setter
def exception(self, value):
self._data = value
class TextJobArguments(object):
"""Assumes utf-8 arguments in addition to name
If one is always dealing in valid utf-8, using this job class relieves one
of the need to encode/decode constantly."""
def _validate_arguments(self, arguments):
pass
@property
def arguments(self):
args = self._arguments
if isinstance(args, six.binary_type):
return args.decode('utf-8')
return args
@arguments.setter
def arguments(self, value):
if not isinstance(value, six.binary_type):
value = value.encode('utf-8')
self._arguments = value
class TextJobUnique(object):
"""Assumes utf-8 unique
If one is always dealing in valid utf-8, using this job class relieves one
of the need to encode/decode constantly."""
@property
def unique(self):
unique = self._unique
if isinstance(unique, six.binary_type):
return unique.decode('utf-8')
return unique
@unique.setter
def unique(self, value):
if not isinstance(value, six.binary_type):
value = value.encode('utf-8')
self._unique = value
class TextList(list):
def append(self, x):
if isinstance(x, six.binary_type):
x = x.decode('utf-8')
super(TextList, self).append(x)
def extend(self, iterable):
def _iter():
for value in iterable:
if isinstance(value, six.binary_type):
yield value.decode('utf-8')
else:
yield value
super(TextList, self).extend(_iter)
def insert(self, i, x):
if isinstance(x, six.binary_type):
x = x.decode('utf-8')
super(TextList, self).insert(i, x)
class TextJob(TextJobArguments, TextJobUnique, Job):
""" Sends and receives UTF-8 arguments and data.
Use this instead of Job when you only expect to send valid UTF-8 through
gearman. It will automatically encode arguments and work data as UTF-8, and
any jobs fetched from this worker will have their arguments and data
decoded assuming they are valid UTF-8, and thus return strings.
Attributes and method signatures are thes ame as Job except as noted here:
** arguments ** (str) This will be returned as a string.
** data ** (tuple of str) This will be returned as a tuble of strings.
**name** (str)
The name of the job. Assumed to be utf-8.
**arguments** (bytes)
The opaque data blob passed to the worker as arguments.
**unique** (str or None)
The unique ID of the job (if supplied).
**handle** (bytes)
The Gearman job handle.
**connection** (:py:class:`Connection` or None)
The connection associated with the job. Only set after the job
has been submitted to a Gearman server.
"""
def __init__(self, handle, name, arguments, unique=None):
super(WorkerJob, self).__init__(name, arguments, unique, handle)
data_type = TextList
def sendWorkData(self, data=b''):
@property
def exception(self):
exception = self._exception
if isinstance(exception, six.binary_type):
return exception.decode('utf-8')
return exception
@exception.setter
def exception(self, value):
if not isinstance(value, six.binary_type):
value = value.encode('utf-8')
self._exception = value
class TextWorkerJob(TextJobArguments, TextJobUnique, WorkerJob):
""" Sends and receives UTF-8 arguments and data.
See TextJob. sendWorkData and sendWorkWarning accept strings
and will encode them as UTF-8.
"""
def sendWorkData(self, data=''):
"""Send a WORK_DATA packet to the client.
:arg bytes data: The data to be sent to the client (optional).
:arg str data: The data to be sent to the client (optional).
"""
if isinstance(data, six.text_type):
data = data.encode('utf8')
return super(TextWorkerJob, self).sendWorkData(data)
data = self.handle + b'\x00' + data
p = Packet(constants.REQ, constants.WORK_DATA, data)
self.connection.sendPacket(p)
def sendWorkWarning(self, data=b''):
def sendWorkWarning(self, data=''):
"""Send a WORK_WARNING packet to the client.
:arg bytes data: The data to be sent to the client (optional).
:arg str data: The data to be sent to the client (optional).
"""
data = self.handle + b'\x00' + data
p = Packet(constants.REQ, constants.WORK_WARNING, data)
self.connection.sendPacket(p)
if isinstance(data, six.text_type):
data = data.encode('utf8')
return super(TextWorkerJob, self).sendWorkWarning(data)
def sendWorkStatus(self, numerator, denominator):
"""Send a WORK_STATUS packet to the client.
Sends a numerator and denominator that together represent the
fraction complete of the job.
:arg numeric numerator: The numerator of the fraction complete.
:arg numeric denominator: The denominator of the fraction complete.
"""
data = (self.handle + b'\x00' +
str(numerator).encode('utf8') + b'\x00' +
str(denominator).encode('utf8'))
p = Packet(constants.REQ, constants.WORK_STATUS, data)
self.connection.sendPacket(p)
def sendWorkComplete(self, data=b''):
def sendWorkComplete(self, data=''):
"""Send a WORK_COMPLETE packet to the client.
:arg bytes data: The data to be sent to the client (optional).
:arg str data: The data to be sent to the client (optional).
"""
if isinstance(data, six.text_type):
data = data.encode('utf8')
return super(TextWorkerJob, self).sendWorkComplete(data)
data = self.handle + b'\x00' + data
p = Packet(constants.REQ, constants.WORK_COMPLETE, data)
self.connection.sendPacket(p)
def sendWorkFail(self):
"Send a WORK_FAIL packet to the client."
p = Packet(constants.REQ, constants.WORK_FAIL, self.handle)
self.connection.sendPacket(p)
def sendWorkException(self, data=b''):
def sendWorkException(self, data=''):
"""Send a WORK_EXCEPTION packet to the client.
:arg bytes data: The exception data to be sent to the client
(optional).
:arg str data: The data to be sent to the client (optional).
"""
data = self.handle + b'\x00' + data
p = Packet(constants.REQ, constants.WORK_EXCEPTION, data)
self.connection.sendPacket(p)
if isinstance(data, six.text_type):
data = data.encode('utf8')
return super(TextWorkerJob, self).sendWorkException(data)
class TextWorker(Worker):
""" Sends and receives UTF-8 only.
See TextJob.
"""
job_class = TextWorkerJob
class BaseBinaryJob(object):
@ -3153,7 +3362,7 @@ class Server(BaseClientServer):
self.sendNoJob(packet.connection)
def sendJobAssignUniq(self, connection, job):
unique = job.unique
unique = job.binary_unique
if not unique:
unique = b''
data = b'\x00'.join((job.handle, job.name, unique, job.arguments))

View File

@ -16,6 +16,7 @@
import os
import threading
import time
import uuid
from OpenSSL import crypto
import fixtures
@ -149,5 +150,79 @@ class TestFunctional(tests.BaseTestCase):
self.assertEqual('test', workerjob.name)
class TestFunctionalText(tests.BaseTestCase):
def setUp(self):
super(TestFunctionalText, self).setUp()
self.server = gear.Server(0)
self.client = gear.Client('client')
self.worker = gear.TextWorker('worker')
self.client.addServer('127.0.0.1', self.server.port)
self.worker.addServer('127.0.0.1', self.server.port)
self.client.waitForServer()
self.worker.waitForServer()
def test_text_job(self):
self.worker.registerFunction('test')
for jobcount in range(2):
job = gear.TextJob('test', 'testdata')
self.client.submitJob(job)
self.assertNotEqual(job.handle, None)
workerjob = self.worker.getJob()
self.assertEqual(workerjob.handle, job.handle)
self.assertEqual(workerjob.arguments, 'testdata')
workerjob.sendWorkData('workdata')
workerjob.sendWorkComplete()
for count in iterate_timeout(30, "job completion"):
if job.complete:
break
self.assertTrue(job.complete)
self.assertEqual(job.data, ['workdata'])
def test_text_job_unique(self):
self.worker.registerFunction('test')
for jobcount in range(2):
jobunique = uuid.uuid4().hex
job = gear.TextJob('test', 'testdata', unique=jobunique)
self.client.submitJob(job)
self.assertNotEqual(job.handle, None)
workerjob = self.worker.getJob()
self.assertEqual(workerjob.handle, job.handle)
self.assertEqual(workerjob.arguments, 'testdata')
workerjob.sendWorkData('workdata')
workerjob.sendWorkComplete()
for count in iterate_timeout(30, "job completion"):
if job.complete:
break
self.assertTrue(job.complete)
self.assertEqual(job.data, ['workdata'])
self.assertEqual(job.unique, jobunique)
self.assertEqual(workerjob.unique, jobunique)
def test_text_job_exception(self):
self.worker.registerFunction('test')
for jobcount in range(2):
job = gear.TextJob('test', 'testdata')
self.client.submitJob(job)
self.assertNotEqual(job.handle, None)
workerjob = self.worker.getJob()
self.assertEqual(workerjob.handle, job.handle)
self.assertEqual(workerjob.arguments, 'testdata')
workerjob.sendWorkException('work failed')
for count in iterate_timeout(30, "job completion"):
if job.complete:
break
self.assertTrue(job.complete)
self.assertEqual(job.exception, 'work failed')
def load_tests(loader, in_tests, pattern):
return testscenarios.load_tests_apply_scenarios(loader, in_tests, pattern)