diff --git a/gear/__init__.py b/gear/__init__.py index 5db3b78..7aaeb19 100644 --- a/gear/__init__.py +++ b/gear/__init__.py @@ -1426,8 +1426,8 @@ class Client(BaseClient): if job.unique is None: unique = b'' else: - unique = job.unique - data = b'\x00'.join((job.binary_name, unique, job.arguments)) + unique = job.binary_unique + data = b'\x00'.join((job.binary_name, unique, job.binary_arguments)) if background: if precedence == PRECEDENCE_NORMAL: cmd = constants.SUBMIT_JOB_BG @@ -1696,6 +1696,159 @@ class FunctionRecord(object): id(self), self.name, self.timeout) +class BaseJob(object): + def __init__(self, name, arguments, unique=None, handle=None): + self._name = convert_to_bytes(name) + self._validate_arguments(arguments) + self._arguments = convert_to_bytes(arguments) + self._unique = convert_to_bytes(unique) + self.handle = handle + self.connection = None + + def _validate_arguments(self, arguments): + if (not isinstance(arguments, bytes) and + not isinstance(arguments, bytearray)): + raise TypeError("arguments must be of type bytes or bytearray") + + @property + def arguments(self): + return self._arguments + + @arguments.setter + def arguments(self, value): + self._arguments = value + + @property + def unique(self): + return self._unique + + @unique.setter + def unique(self, value): + self._unique = value + + @property + def name(self): + if isinstance(self._name, six.binary_type): + return self._name.decode('utf-8') + return self._name + + @name.setter + def name(self, value): + if isinstance(value, six.text_type): + value = value.encode('utf-8') + self._name = value + + @property + def binary_name(self): + return self._name + + @property + def binary_arguments(self): + return self._arguments + + @property + def binary_unique(self): + return self._unique + + def __repr__(self): + return '' % ( + id(self), self.handle, self.name, self.unique) + + +class WorkerJob(BaseJob): + """A job that Gearman has assigned to a Worker. Not intended to + be instantiated directly, but rather returned by + :py:meth:`Worker.getJob`. + + :arg str handle: The job handle assigned by gearman. + :arg str name: The name of the job. + :arg bytes arguments: The opaque data blob passed to the worker + as arguments. + :arg str unique: A byte string to uniquely identify the job to Gearman + (optional). + + The following instance attributes are available: + + **name** (str) + The name of the job. Assumed to be utf-8. + **arguments** (bytes) + The opaque data blob passed to the worker as arguments. + **unique** (str or None) + The unique ID of the job (if supplied). + **handle** (bytes) + The Gearman job handle. + **connection** (:py:class:`Connection` or None) + The connection associated with the job. Only set after the job + has been submitted to a Gearman server. + """ + + def __init__(self, handle, name, arguments, unique=None): + super(WorkerJob, self).__init__(name, arguments, unique, handle) + + def sendWorkData(self, data=b''): + """Send a WORK_DATA packet to the client. + + :arg bytes data: The data to be sent to the client (optional). + """ + + data = self.handle + b'\x00' + data + p = Packet(constants.REQ, constants.WORK_DATA, data) + self.connection.sendPacket(p) + + def sendWorkWarning(self, data=b''): + """Send a WORK_WARNING packet to the client. + + :arg bytes data: The data to be sent to the client (optional). + """ + + data = self.handle + b'\x00' + data + p = Packet(constants.REQ, constants.WORK_WARNING, data) + self.connection.sendPacket(p) + + def sendWorkStatus(self, numerator, denominator): + """Send a WORK_STATUS packet to the client. + + Sends a numerator and denominator that together represent the + fraction complete of the job. + + :arg numeric numerator: The numerator of the fraction complete. + :arg numeric denominator: The denominator of the fraction complete. + """ + + data = (self.handle + b'\x00' + + str(numerator).encode('utf8') + b'\x00' + + str(denominator).encode('utf8')) + p = Packet(constants.REQ, constants.WORK_STATUS, data) + self.connection.sendPacket(p) + + def sendWorkComplete(self, data=b''): + """Send a WORK_COMPLETE packet to the client. + + :arg bytes data: The data to be sent to the client (optional). + """ + + data = self.handle + b'\x00' + data + p = Packet(constants.REQ, constants.WORK_COMPLETE, data) + self.connection.sendPacket(p) + + def sendWorkFail(self): + "Send a WORK_FAIL packet to the client." + + p = Packet(constants.REQ, constants.WORK_FAIL, self.handle) + self.connection.sendPacket(p) + + def sendWorkException(self, data=b''): + """Send a WORK_EXCEPTION packet to the client. + + :arg bytes data: The exception data to be sent to the client + (optional). + """ + + data = self.handle + b'\x00' + data + p = Packet(constants.REQ, constants.WORK_EXCEPTION, data) + self.connection.sendPacket(p) + + class Worker(BaseClient): """A Gearman worker. @@ -1708,6 +1861,8 @@ class Worker(BaseClient): is deprecated, use client_id instead. """ + job_class = WorkerJob + def __init__(self, client_id=None, worker_id=None): if not client_id or worker_id: raise Exception("A client_id must be provided") @@ -2027,7 +2182,7 @@ class Worker(BaseClient): arguments, unique) def _handleJobAssignment(self, packet, handle, name, arguments, unique): - job = WorkerJob(handle, name, arguments, unique) + job = self.job_class(handle, name, arguments, unique) job.connection = packet.connection self.job_lock.acquire() @@ -2043,38 +2198,6 @@ class Worker(BaseClient): self.job_lock.release() -class BaseJob(object): - def __init__(self, name, arguments, unique=None, handle=None): - self._name = convert_to_bytes(name) - if (not isinstance(arguments, bytes) and - not isinstance(arguments, bytearray)): - raise TypeError("arguments must be of type bytes or bytearray") - self.arguments = arguments - self.unique = convert_to_bytes(unique) - self.handle = handle - self.connection = None - - @property - def name(self): - if isinstance(self._name, six.binary_type): - return self._name.decode('utf-8') - return self._name - - @name.setter - def name(self, value): - if isinstance(value, six.text_type): - value = value.encode('utf-8') - self._name = value - - @property - def binary_name(self): - return self._name - - def __repr__(self): - return '' % ( - id(self), self.handle, self.name, self.unique) - - class Job(BaseJob): """A job to run or being run by Gearman. @@ -2097,7 +2220,9 @@ class Job(BaseJob): **data** (list of byte-arrays) The result data returned from Gearman. Each packet appends an element to the list. Depending on the nature of the data, the - elements may need to be concatenated before use. + elements may need to be concatenated before use. This is returned + as a snapshot copy of the data to prevent accidental attempts at + modification which will be lost. **exception** (bytes or None) Exception information returned from Gearman. None if no exception has been received. @@ -2127,10 +2252,12 @@ class Job(BaseJob): has been submitted to a Gearman server. """ + data_type = list + def __init__(self, name, arguments, unique=None): super(Job, self).__init__(name, arguments, unique) - self.data = [] - self.exception = None + self._data = self.data_type() + self._exception = None self.warning = False self.complete = False self.failure = False @@ -2140,99 +2267,181 @@ class Job(BaseJob): self.known = None self.running = None + @property + def binary_data(self): + for value in self._data: + if isinstance(value, six.text_type): + value = value.encode('utf-8') + yield value -class WorkerJob(BaseJob): - """A job that Gearman has assigned to a Worker. Not intended to - be instantiated directly, but rather returned by - :py:meth:`Worker.getJob`. + @property + def data(self): + return self._data - :arg str handle: The job handle assigned by gearman. - :arg str name: The name of the job. - :arg bytes arguments: The opaque data blob passed to the worker - as arguments. - :arg str unique: A byte string to uniquely identify the job to Gearman - (optional). + @data.setter + def data(self, value): + if not isinstance(value, self.data_type): + raise ValueError( + "data attribute must be {}".format(self.data_type)) + self._data = value - The following instance attributes are available: + @property + def exception(self): + return self._exception + + @exception.setter + def exception(self, value): + self._data = value + + +class TextJobArguments(object): + """Assumes utf-8 arguments in addition to name + + If one is always dealing in valid utf-8, using this job class relieves one + of the need to encode/decode constantly.""" + + def _validate_arguments(self, arguments): + pass + + @property + def arguments(self): + args = self._arguments + if isinstance(args, six.binary_type): + return args.decode('utf-8') + return args + + @arguments.setter + def arguments(self, value): + if not isinstance(value, six.binary_type): + value = value.encode('utf-8') + self._arguments = value + + +class TextJobUnique(object): + """Assumes utf-8 unique + + If one is always dealing in valid utf-8, using this job class relieves one + of the need to encode/decode constantly.""" + + @property + def unique(self): + unique = self._unique + if isinstance(unique, six.binary_type): + return unique.decode('utf-8') + return unique + + @unique.setter + def unique(self, value): + if not isinstance(value, six.binary_type): + value = value.encode('utf-8') + self._unique = value + + +class TextList(list): + def append(self, x): + if isinstance(x, six.binary_type): + x = x.decode('utf-8') + super(TextList, self).append(x) + + def extend(self, iterable): + def _iter(): + for value in iterable: + if isinstance(value, six.binary_type): + yield value.decode('utf-8') + else: + yield value + super(TextList, self).extend(_iter) + + def insert(self, i, x): + if isinstance(x, six.binary_type): + x = x.decode('utf-8') + super(TextList, self).insert(i, x) + + +class TextJob(TextJobArguments, TextJobUnique, Job): + """ Sends and receives UTF-8 arguments and data. + + Use this instead of Job when you only expect to send valid UTF-8 through + gearman. It will automatically encode arguments and work data as UTF-8, and + any jobs fetched from this worker will have their arguments and data + decoded assuming they are valid UTF-8, and thus return strings. + + Attributes and method signatures are thes ame as Job except as noted here: + + ** arguments ** (str) This will be returned as a string. + ** data ** (tuple of str) This will be returned as a tuble of strings. - **name** (str) - The name of the job. Assumed to be utf-8. - **arguments** (bytes) - The opaque data blob passed to the worker as arguments. - **unique** (str or None) - The unique ID of the job (if supplied). - **handle** (bytes) - The Gearman job handle. - **connection** (:py:class:`Connection` or None) - The connection associated with the job. Only set after the job - has been submitted to a Gearman server. """ - def __init__(self, handle, name, arguments, unique=None): - super(WorkerJob, self).__init__(name, arguments, unique, handle) + data_type = TextList - def sendWorkData(self, data=b''): + @property + def exception(self): + exception = self._exception + if isinstance(exception, six.binary_type): + return exception.decode('utf-8') + return exception + + @exception.setter + def exception(self, value): + if not isinstance(value, six.binary_type): + value = value.encode('utf-8') + self._exception = value + + +class TextWorkerJob(TextJobArguments, TextJobUnique, WorkerJob): + """ Sends and receives UTF-8 arguments and data. + + See TextJob. sendWorkData and sendWorkWarning accept strings + and will encode them as UTF-8. + """ + def sendWorkData(self, data=''): """Send a WORK_DATA packet to the client. - :arg bytes data: The data to be sent to the client (optional). + :arg str data: The data to be sent to the client (optional). """ + if isinstance(data, six.text_type): + data = data.encode('utf8') + return super(TextWorkerJob, self).sendWorkData(data) - data = self.handle + b'\x00' + data - p = Packet(constants.REQ, constants.WORK_DATA, data) - self.connection.sendPacket(p) - - def sendWorkWarning(self, data=b''): + def sendWorkWarning(self, data=''): """Send a WORK_WARNING packet to the client. - :arg bytes data: The data to be sent to the client (optional). + :arg str data: The data to be sent to the client (optional). """ - data = self.handle + b'\x00' + data - p = Packet(constants.REQ, constants.WORK_WARNING, data) - self.connection.sendPacket(p) + if isinstance(data, six.text_type): + data = data.encode('utf8') + return super(TextWorkerJob, self).sendWorkWarning(data) - def sendWorkStatus(self, numerator, denominator): - """Send a WORK_STATUS packet to the client. - - Sends a numerator and denominator that together represent the - fraction complete of the job. - - :arg numeric numerator: The numerator of the fraction complete. - :arg numeric denominator: The denominator of the fraction complete. - """ - - data = (self.handle + b'\x00' + - str(numerator).encode('utf8') + b'\x00' + - str(denominator).encode('utf8')) - p = Packet(constants.REQ, constants.WORK_STATUS, data) - self.connection.sendPacket(p) - - def sendWorkComplete(self, data=b''): + def sendWorkComplete(self, data=''): """Send a WORK_COMPLETE packet to the client. - :arg bytes data: The data to be sent to the client (optional). + :arg str data: The data to be sent to the client (optional). """ + if isinstance(data, six.text_type): + data = data.encode('utf8') + return super(TextWorkerJob, self).sendWorkComplete(data) - data = self.handle + b'\x00' + data - p = Packet(constants.REQ, constants.WORK_COMPLETE, data) - self.connection.sendPacket(p) - - def sendWorkFail(self): - "Send a WORK_FAIL packet to the client." - - p = Packet(constants.REQ, constants.WORK_FAIL, self.handle) - self.connection.sendPacket(p) - - def sendWorkException(self, data=b''): + def sendWorkException(self, data=''): """Send a WORK_EXCEPTION packet to the client. - :arg bytes data: The exception data to be sent to the client - (optional). + :arg str data: The data to be sent to the client (optional). """ - data = self.handle + b'\x00' + data - p = Packet(constants.REQ, constants.WORK_EXCEPTION, data) - self.connection.sendPacket(p) + if isinstance(data, six.text_type): + data = data.encode('utf8') + return super(TextWorkerJob, self).sendWorkException(data) + + +class TextWorker(Worker): + """ Sends and receives UTF-8 only. + + See TextJob. + + """ + + job_class = TextWorkerJob class BaseBinaryJob(object): @@ -3153,7 +3362,7 @@ class Server(BaseClientServer): self.sendNoJob(packet.connection) def sendJobAssignUniq(self, connection, job): - unique = job.unique + unique = job.binary_unique if not unique: unique = b'' data = b'\x00'.join((job.handle, job.name, unique, job.arguments)) diff --git a/gear/tests/test_functional.py b/gear/tests/test_functional.py index ea02ffe..e512f2f 100644 --- a/gear/tests/test_functional.py +++ b/gear/tests/test_functional.py @@ -16,6 +16,7 @@ import os import threading import time +import uuid from OpenSSL import crypto import fixtures @@ -149,5 +150,79 @@ class TestFunctional(tests.BaseTestCase): self.assertEqual('test', workerjob.name) +class TestFunctionalText(tests.BaseTestCase): + def setUp(self): + super(TestFunctionalText, self).setUp() + self.server = gear.Server(0) + self.client = gear.Client('client') + self.worker = gear.TextWorker('worker') + self.client.addServer('127.0.0.1', self.server.port) + self.worker.addServer('127.0.0.1', self.server.port) + self.client.waitForServer() + self.worker.waitForServer() + + def test_text_job(self): + self.worker.registerFunction('test') + + for jobcount in range(2): + job = gear.TextJob('test', 'testdata') + self.client.submitJob(job) + self.assertNotEqual(job.handle, None) + + workerjob = self.worker.getJob() + self.assertEqual(workerjob.handle, job.handle) + self.assertEqual(workerjob.arguments, 'testdata') + workerjob.sendWorkData('workdata') + workerjob.sendWorkComplete() + + for count in iterate_timeout(30, "job completion"): + if job.complete: + break + self.assertTrue(job.complete) + self.assertEqual(job.data, ['workdata']) + + def test_text_job_unique(self): + self.worker.registerFunction('test') + + for jobcount in range(2): + jobunique = uuid.uuid4().hex + job = gear.TextJob('test', 'testdata', unique=jobunique) + self.client.submitJob(job) + self.assertNotEqual(job.handle, None) + + workerjob = self.worker.getJob() + self.assertEqual(workerjob.handle, job.handle) + self.assertEqual(workerjob.arguments, 'testdata') + workerjob.sendWorkData('workdata') + workerjob.sendWorkComplete() + + for count in iterate_timeout(30, "job completion"): + if job.complete: + break + self.assertTrue(job.complete) + self.assertEqual(job.data, ['workdata']) + self.assertEqual(job.unique, jobunique) + self.assertEqual(workerjob.unique, jobunique) + + def test_text_job_exception(self): + self.worker.registerFunction('test') + + for jobcount in range(2): + job = gear.TextJob('test', 'testdata') + self.client.submitJob(job) + self.assertNotEqual(job.handle, None) + + workerjob = self.worker.getJob() + self.assertEqual(workerjob.handle, job.handle) + self.assertEqual(workerjob.arguments, 'testdata') + workerjob.sendWorkException('work failed') + + for count in iterate_timeout(30, "job completion"): + if job.complete: + break + self.assertTrue(job.complete) + self.assertEqual(job.exception, 'work failed') + + def load_tests(loader, in_tests, pattern): return testscenarios.load_tests_apply_scenarios(loader, in_tests, pattern)