Add simple Gearman server.

For testing, not for real use under load.

Change-Id: I9c84b1eea7d868e907b80b6edf60c49c172c356b
This commit is contained in:
James E. Blair 2013-05-02 17:38:47 -07:00
parent 772d256328
commit 61bd6ce570
2 changed files with 518 additions and 189 deletions

View File

@ -10,6 +10,9 @@ it simple, with a relatively thin abstration of the Gearman protocol
itself. It should be easy to use to build a client or worker that
operates either synchronously or asynchronously.
The module also provides a simple Gearman server for use as a
convenience in unit tests. The server is not designed for production
use under load.
Client Example
--------------
@ -137,6 +140,20 @@ AdminRequest Objects
:inherited-members:
Server Usage
------------
A simple Gearman server is provided for convenience in unit testing,
but is not designed for production use at scale. It takes no
parameters other than the port number on which to listen.
Server Objects
^^^^^^^^^^^^^^
.. autoclass:: gear.Server
:members:
:inherited-members:
Common
------

View File

@ -121,6 +121,12 @@ class Connection(object):
data.
"""
if self.conn:
try:
self.conn.close()
except Exception:
pass
self.log.debug("Disconnected from %s port %s" % (self.host, self.port))
self._init()
@ -136,8 +142,12 @@ class Connection(object):
:arg Packet packet: The :py:class:`Packet` to send.
"""
self.log.debug("Sending packet: %s" % packet)
self.conn.send(packet.toBinary())
def _getAdminRequest(self):
return self.admin_requests.pop(0)
def readPacket(self):
"""Read one packet or administrative response from the server.
@ -161,7 +171,7 @@ class Connection(object):
admin = False
else:
admin = True
admin_request = self.admin_requests.pop(0)
admin_request = self._getAdminRequest()
packet += c
if admin:
if admin_request.isComplete(packet):
@ -369,10 +379,11 @@ class Packet(object):
return job
class BaseClient(object):
log = logging.getLogger("gear.BaseClient")
class BaseClientServer(object):
log = logging.getLogger("gear.BaseClientServer")
def __init__(self):
self.running = True
self.active_connections = []
self.inactive_connections = []
@ -391,9 +402,231 @@ class BaseClient(object):
target=self._doConnectLoop)
self.connect_thread.start()
def __repr__(self):
return '<gear.Client 0x%x>' % id(self)
def _doConnectLoop(self):
# Outer run method of the reconnection thread
while self.running:
self.connections_condition.acquire()
while self.running and not self.inactive_connections:
self.log.debug("Waiting for change in available servers "
"to reconnect")
self.connections_condition.wait()
self.connections_condition.release()
self.log.debug("Checking if servers need to be reconnected")
try:
if self.running and not self._connectLoop():
# Nothing happened
time.sleep(2)
except Exception:
self.log.exception("Exception in connect loop:")
def _connectLoop(self):
# Inner method of the reconnection loop, triggered by
# a connection change
success = False
for conn in self.inactive_connections[:]:
self.log.debug("Trying to reconnect %s" % conn)
try:
conn.reconnect()
except ConnectionError:
self.log.debug("Unable to connect to %s" % conn)
continue
except Exception:
self.log.exception("Exception while connecting to %s" % conn)
continue
try:
self._onConnect(conn)
except Exception:
self.log.exception("Exception while performing on-connect "
"tasks for %s" % conn)
continue
self.connections_condition.acquire()
self.inactive_connections.remove(conn)
self.active_connections.append(conn)
self.connections_condition.notifyAll()
os.write(self.wake_write, '1\n')
self.connections_condition.release()
try:
self._onActiveConnection(conn)
except Exception:
self.log.exception("Exception while performing active conn "
"tasks for %s" % conn)
success = True
return success
def _onConnect(self, conn):
# Called immediately after a successful (re-)connection
pass
def _onActiveConnection(self, conn):
# Called immediately after a connection is activated
pass
def _lostConnection(self, conn):
# Called as soon as a connection is detected as faulty. Remove
# it and return ASAP and let the connection thread deal with it.
self.log.debug("Marking %s as disconnected" % conn)
self.connections_condition.acquire()
jobs = conn.pending_jobs + conn.related_jobs.values()
self.active_connections.remove(conn)
self.inactive_connections.append(conn)
self.connections_condition.notifyAll()
self.connections_condition.release()
for job in jobs:
self.handleDisconnect(job)
def _doPollLoop(self):
# Outer run method of poll thread.
while self.running:
self.connections_condition.acquire()
while self.running and not self.active_connections:
self.log.debug("Waiting for change in available connections "
"to poll")
self.connections_condition.wait()
self.connections_condition.release()
try:
self._pollLoop()
except Exception:
self.log.exception("Exception in poll loop:")
def _pollLoop(self):
# Inner method of poll loop
self.log.debug("Preparing to poll")
poll = select.poll()
bitmask = (select.POLLIN | select.POLLERR |
select.POLLHUP | select.POLLNVAL)
# Reverse mapping of fd -> connection
conn_dict = {}
for conn in self.active_connections:
poll.register(conn.conn.fileno(), bitmask)
conn_dict[conn.conn.fileno()] = conn
# Register the wake pipe so that we can break if we need to
# reconfigure connections
poll.register(self.wake_read, bitmask)
while self.running:
self.log.debug("Polling %s connections" %
len(self.active_connections))
ret = poll.poll()
for fd, event in ret:
if fd == self.wake_read:
self.log.debug("Woken by pipe")
while True:
if os.read(self.wake_read, 1) == '\n':
break
return
if event & select.POLLIN:
self.log.debug("Processing input on %s" % conn)
p = conn_dict[fd].readPacket()
if p:
if isinstance(p, Packet):
self.handlePacket(p)
else:
self.handleAdminRequest(p)
else:
self.log.debug("Received no data on %s" % conn)
self._lostConnection(conn_dict[fd])
return
else:
self.log.debug("Received error event on %s" % conn)
self._lostConnection(conn_dict[fd])
return
def handlePacket(self, packet):
"""Handle a received packet.
This method is called whenever a packet is received from any
connection. It normally calls the handle method appropriate
for the specific packet.
:arg Packet packet: The :py:class:`Packet` that was received.
"""
self.log.debug("Received packet %s" % packet)
if packet.ptype == constants.JOB_CREATED:
self.handleJobCreated(packet)
elif packet.ptype == constants.WORK_COMPLETE:
self.handleWorkComplete(packet)
elif packet.ptype == constants.WORK_FAIL:
self.handleWorkFail(packet)
elif packet.ptype == constants.WORK_EXCEPTION:
self.handleWorkException(packet)
elif packet.ptype == constants.WORK_DATA:
self.handleWorkData(packet)
elif packet.ptype == constants.WORK_WARNING:
self.handleWorkWarning(packet)
elif packet.ptype == constants.WORK_STATUS:
self.handleWorkStatus(packet)
elif packet.ptype == constants.STATUS_RES:
self.handleStatusRes(packet)
elif packet.ptype == constants.JOB_ASSIGN_UNIQ:
self.handleJobAssignUnique(packet)
elif packet.ptype == constants.NO_JOB:
self.handleNoJob(packet)
elif packet.ptype == constants.NOOP:
self.handleNoop(packet)
elif packet.ptype == constants.SUBMIT_JOB:
self.handleSubmitJob(packet)
elif packet.ptype == constants.GRAB_JOB_UNIQ:
self.handleGrabJobUniq(packet)
elif packet.ptype == constants.PRE_SLEEP:
self.handlePreSleep(packet)
elif packet.ptype == constants.SET_CLIENT_ID:
self.handleSetClientID(packet)
elif packet.ptype == constants.CAN_DO:
self.handleCanDo(packet)
elif packet.ptype == constants.CANT_DO:
self.handleCantDo(packet)
elif packet.ptype == constants.RESET_ABILITIES:
self.handleResetAbilities(packet)
else:
self.log.error("Received unknown packet" % packet)
def handleAdminRequest(self, request):
"""Handle an administrative command response from Gearman.
This method is called whenever a response to a previously
issued administrative command is received from one of this
client's connections. It normally releases the wait lock on
the initiating AdminRequest object.
:arg AdminRequest request: The :py:class:`AdminRequest` that
initiated the received response.
"""
self.log.debug("Received admin data %s" % request)
request.setComplete()
def shutdown(self):
"""Close all connections and stop all running threads.
The object may no longer be used after shutdown is called.
"""
self._shutdown()
self._cleanup()
def _shutdown(self):
# The first part of the shutdown process where all threads
# are told to exit.
self.running = False
self.connections_condition.acquire()
self.connections_condition.notifyAll()
os.write(self.wake_write, '1\n')
self.connections_condition.release()
def _cleanup(self):
# The second part of the shutdown process where we wait for all
# threads to exit and then clean up.
self.poll_thread.join()
self.connect_thread.join()
for connection in self.active_connections:
connection.disconnect()
self.active_connections = []
self.inactive_connections = []
class BaseClient(BaseClientServer):
def addServer(self, host, port=4730):
"""Add a server to the client's connection pool.
@ -434,9 +667,9 @@ class BaseClient(object):
Block until at least one gearman server is connected.
"""
connected = False
while True:
while self.running:
self.connections_condition.acquire()
while not self.active_connections:
while self.running and not self.active_connections:
self.log.debug("Waiting for at least one active connection")
self.connections_condition.wait()
if self.active_connections:
@ -446,70 +679,6 @@ class BaseClient(object):
if connected:
return
def _doConnectLoop(self):
# Outer run method of the reconnection thread
while True:
self.connections_condition.acquire()
while not self.inactive_connections:
self.log.debug("Waiting for change in available servers "
"to reconnect")
self.connections_condition.wait()
self.connections_condition.release()
self.log.debug("Checking if servers need to be reconnected")
try:
if not self._connectLoop():
# Nothing happened
time.sleep(2)
except Exception:
self.log.exception("Exception in connect loop:")
def _connectLoop(self):
# Inner method of the reconnection loop, triggered by
# a connection change
success = False
for conn in self.inactive_connections[:]:
self.log.debug("Trying to reconnect %s" % conn)
try:
conn.reconnect()
except ConnectionError:
self.log.debug("Unable to connect to %s" % conn)
continue
except Exception:
self.log.exception("Exception while connecting to %s" % conn)
continue
try:
self._onConnect(conn)
except Exception:
self.log.exception("Exception while performing on-connect "
"tasks for %s" % conn)
continue
self.connections_condition.acquire()
self.inactive_connections.remove(conn)
self.active_connections.append(conn)
self.connections_condition.notifyAll()
os.write(self.wake_write, '1\n')
self.connections_condition.release()
success = True
return success
def _onConnect(self, conn):
# Called immediately after a successful (re-)connection
pass
def _lostConnection(self, conn):
# Called as soon as a connection is detected as faulty. Remove
# it and return ASAP and let the connection thread deal with it.
self.log.debug("Marking %s as disconnected" % conn)
self.connections_condition.acquire()
jobs = conn.pending_jobs + conn.related_jobs.values()
self.active_connections.remove(conn)
self.inactive_connections.append(conn)
self.connections_condition.notifyAll()
self.connections_condition.release()
for job in jobs:
self.handleDisconnect(job)
def getConnection(self):
"""Return a connected server.
@ -539,62 +708,6 @@ class BaseClient(object):
self.connections_condition.release()
return conn
def _doPollLoop(self):
# Outer run method of poll thread.
while True:
self.connections_condition.acquire()
while not self.active_connections:
self.log.debug("Waiting for change in available servers "
"to poll")
self.connections_condition.wait()
self.connections_condition.release()
try:
self._pollLoop()
except Exception:
self.log.exception("Exception in poll loop:")
def _pollLoop(self):
# Inner method of poll loop
self.log.debug("Preparing to poll")
poll = select.poll()
bitmask = (select.POLLIN | select.POLLERR |
select.POLLHUP | select.POLLNVAL)
# Reverse mapping of fd -> connection
conn_dict = {}
for conn in self.active_connections:
poll.register(conn.conn.fileno(), bitmask)
conn_dict[conn.conn.fileno()] = conn
# Register the wake pipe so that we can break if we need to
# reconfigure connections
poll.register(self.wake_read, bitmask)
while True:
self.log.debug("Polling %s connections" %
len(self.active_connections))
ret = poll.poll()
for fd, event in ret:
if fd == self.wake_read:
self.log.debug("Woken by pipe")
while True:
if os.read(self.wake_read, 1) == '\n':
break
return
if event & select.POLLIN:
self.log.debug("Processing input on %s" % conn)
p = conn_dict[fd].readPacket()
if p:
if isinstance(p, Packet):
self.handlePacket(p)
else:
self.handleAdminResponse(p)
else:
self.log.debug("Received no data on %s" % conn)
self._lostConnection(conn_dict[fd])
return
else:
self.log.debug("Received error event on %s" % conn)
self._lostConnection(conn_dict[fd])
return
def broadcast(self, packet):
"""Send a packet to all currently connected servers.
@ -626,21 +739,6 @@ class BaseClient(object):
self._lostConnection(connection)
raise
def handleAdminResponse(self, request):
"""Handle an administrative command response from Gearman.
This method is called whenever a response to a previously
issued administrative command is received from one of this
client's connections. It normally releases the wait lock on
the initiating AdminRequest object.
:arg AdminRequest request: The :py:class:`AdminRequest` that
initiated the received response.
"""
self.log.debug("Received admin response %s" % request)
request.setComplete()
class Client(BaseClient):
"""A Gearman client.
@ -653,6 +751,9 @@ class Client(BaseClient):
log = logging.getLogger("gear.Client")
def __repr__(self):
return '<gear.Client 0x%x>' % id(self)
def submitJob(self, job, background=False, precedence=PRECEDENCE_NORMAL):
"""Submit a job to a Gearman server.
@ -709,34 +810,6 @@ class Client(BaseClient):
# try again
self._lostConnection(conn)
def handlePacket(self, packet):
"""Handle a packet received from a Gearman server.
This method is called whenever a packet is received from any
of this client's connections. It normally calls the handle
method appropriate for the specific packet.
:arg Packet packet: The :py:class:`Packet` that was received.
"""
self.log.debug("Received packet %s" % packet)
if packet.ptype == constants.JOB_CREATED:
self.handleJobCreated(packet)
elif packet.ptype == constants.WORK_COMPLETE:
self.handleWorkComplete(packet)
elif packet.ptype == constants.WORK_FAIL:
self.handleWorkFail(packet)
elif packet.ptype == constants.WORK_EXCEPTION:
self.handleWorkException(packet)
elif packet.ptype == constants.WORK_DATA:
self.handleWorkData(packet)
elif packet.ptype == constants.WORK_WARNING:
self.handleWorkWarning(packet)
elif packet.ptype == constants.WORK_STATUS:
self.handleWorkStatus(packet)
elif packet.ptype == constants.STATUS_RES:
self.handleStatusRes(packet)
def handleJobCreated(self, packet):
"""Handle a JOB_CREATED packet.
@ -946,6 +1019,9 @@ class Worker(BaseClient):
self.job_queue = Queue.Queue()
super(Worker, self).__init__()
def __repr__(self):
return '<gear.Worker 0x%x>' % id(self)
def registerFunction(self, name, timeout=None):
"""Register a function with Gearman.
@ -1038,6 +1114,14 @@ class Worker(BaseClient):
# Any exceptions will be handled by the calling function, and the
# connection will not be put into the pool.
def _onActiveConnection(self, conn):
self.job_lock.acquire()
try:
if self.waiting_for_jobs > 0:
self._updateStateMachines()
finally:
self.job_lock.release()
def _updateStateMachines(self):
connections = self.active_connections[:]
@ -1097,6 +1181,8 @@ class Worker(BaseClient):
ok = True
for connection in connections:
if connection.state == "GRAB_WAIT":
# Replies to GRAB_JOB should be fast, give up if we've
# been waiting for more than 5 seconds.
if now - connection.state_time > 5:
self._lostConnection(connection)
else:
@ -1115,6 +1201,10 @@ class Worker(BaseClient):
self._updateStateMachines()
self.job_lock.release()
def _shutdown(self):
super(Worker, self)._shutdown()
self.stopWaitingForJobs()
def handleNoop(self, packet):
"""Handle a NOOP packet.
@ -1188,25 +1278,6 @@ class Worker(BaseClient):
finally:
self.job_lock.release()
def handlePacket(self, packet):
"""Handle a packet received from a Gearman server.
This method is called whenever a packet is received from any
of this worker's connections. It normally calls the handle
method appropriate for the specific packet.
:arg Packet packet: The :py:class:`Packet` that was received.
"""
self.log.debug("Received packet %s" % packet)
if packet.ptype == constants.JOB_ASSIGN_UNIQ:
self.handleJobAssignUnique(packet)
elif packet.ptype == constants.NO_JOB:
self.handleNoJob(packet)
elif packet.ptype == constants.NOOP:
self.handleNoop(packet)
class BaseJob(object):
log = logging.getLogger("gear.Job")
@ -1396,3 +1467,244 @@ class WorkerJob(BaseJob):
data = self.handle + '\x00' + data
p = Packet(constants.REQ, constants.WORK_EXCEPTION, data)
self.connection.sendPacket(p)
# Below are classes for use in the server implementation:
class ServerAdminRequest(AdminRequest):
"""An administrative request sent to a server."""
finished_re = re.compile('^.*\r?\n', re.M)
def __init__(self, connection):
super(ServerAdminRequest, self).__init__()
self.connection = connection
def isComplete(self, data):
if self.finished_re.search(data):
self.command = data.strip()
return True
return False
class ServerConnection(Connection):
"""A Connection to a Gearman Client."""
def __init__(self, addr, conn):
self.host = addr[0]
self.port = addr[1]
self.conn = conn
self.max_handle = 0
self.client_id = None
self.functions = set()
self.changeState("INIT")
def _getAdminRequest(self):
return ServerAdminRequest(self)
def __repr__(self):
if self.client_id:
name = self.client_id
else:
name = '0x%x' % id(self)
return '<gear.Connection name: %s host: %s port: %s>' % (
name, self.host, self.port)
class Server(BaseClientServer):
"""A simple gearman server implementation for testing
(not for production use).
:arg str port: The TCP port on which to listen.
"""
def __init__(self, port=4730):
self.port = port
self.queue = []
self.jobs = {}
self.connect_wake_read, self.connect_wake_write = os.pipe()
for res in socket.getaddrinfo(None, self.port, socket.AF_UNSPEC,
socket.SOCK_STREAM, 0,
socket.AI_PASSIVE):
af, socktype, proto, canonname, sa = res
try:
self.socket = socket.socket(af, socktype, proto)
self.socket.setsockopt(socket.SOL_SOCKET,
socket.SO_REUSEADDR, 1)
except socket.error:
self.socket = None
continue
try:
self.socket.bind(sa)
self.socket.listen(1)
except socket.error:
self.socket.close()
self.socket = None
continue
break
if self.socket is None:
raise Exception("Could not open socket")
super(Server, self).__init__()
def _doConnectLoop(self):
while self.running:
try:
self.connectLoop()
except Exception:
self.log.exception("Exception in connect loop:")
def connectLoop(self):
poll = select.poll()
bitmask = (select.POLLIN | select.POLLERR |
select.POLLHUP | select.POLLNVAL)
# Register the wake pipe so that we can break if we need to
# shutdown.
poll.register(self.connect_wake_read, bitmask)
poll.register(self.socket.fileno(), bitmask)
while self.running:
ret = poll.poll()
for fd, event in ret:
if fd == self.connect_wake_read:
self.log.debug("Accept woken by pipe")
while True:
if os.read(self.connect_wake_read, 1) == '\n':
break
return
if event & select.POLLIN:
self.log.debug("Accepting new connection")
c, addr = self.socket.accept()
self.log.debug("Accepted new connection")
conn = ServerConnection(addr, c)
self.connections_condition.acquire()
self.active_connections.append(conn)
self.connections_condition.notifyAll()
os.write(self.wake_write, '1\n')
self.connections_condition.release()
def _shutdown(self):
super(Server, self)._shutdown()
os.write(self.connect_wake_write, '1\n')
def _cleanup(self):
super(Server, self)._cleanup()
self.socket.close()
def _lostConnection(self, conn):
# Called as soon as a connection is detected as faulty. Remove
# it and return ASAP and let the connection thread deal with it.
self.log.debug("Marking %s as disconnected" % conn)
self.connections_condition.acquire()
self.active_connections.remove(conn)
self.connections_condition.notifyAll()
self.connections_condition.release()
def handleAdminRequest(self, request):
if request.command.startswith('cancel job'):
self.handleCancelJob(request)
def handleCancelJob(self, request):
words = request.command.split()
handle = words[2]
if handle in self.jobs:
for job in self.queue:
if handle == job.handle:
self.queue.remove(job)
del self.jobs[handle]
request.connection.conn.send("OK\n")
return
request.connection.conn.send("ERR UNKNOWN_JOB\n")
def wakeConnections(self):
p = Packet(constants.REQ, constants.NOOP, '')
for connection in self.active_connections:
if connection.state == 'SLEEP':
connection.sendPacket(p)
def handleSubmitJob(self, packet):
name = packet.getArgument(0)
unique = packet.getArgument(1)
if not unique:
unique = None
arguments = packet.getArgument(2)
packet.connection.max_handle += 1
handle = 'H:%s:%s' % (packet.connection.host,
str(packet.connection.max_handle))
job = BaseJob(name, arguments, unique, handle)
job.connection = packet.connection
p = Packet(constants.REQ, constants.JOB_CREATED, handle)
packet.connection.sendPacket(p)
self.jobs[handle] = job
self.queue.append(job)
self.wakeConnections()
def handleGrabJobUniq(self, packet):
job = None
for j in self.queue:
if j.name in packet.connection.functions:
job = j
self.queue.remove(j)
break
if job:
unique = job.unique
if not unique:
unique = ''
data = '%s\x00%s\x00%s\x00%s' % (job.handle, job.name,
unique, job.arguments)
p = Packet(constants.REQ, constants.JOB_ASSIGN_UNIQ, data)
packet.connection.sendPacket(p)
else:
p = Packet(constants.REQ, constants.NO_JOB, "")
packet.connection.sendPacket(p)
def handlePreSleep(self, packet):
packet.connection.changeState("SLEEP")
def handleWorkComplete(self, packet):
self.handlePassthrough(self, packet, True)
def handleWorkFail(self, packet):
self.handlePassthrough(self, packet, True)
def handleWorkException(self, packet):
self.handlePassthrough(self, packet, True)
def handleWorkData(self, packet):
self.handlePassthrough(self, packet)
def handleWorkWarning(self, packet):
self.handlePassthrough(self, packet)
def handleWorkStatus(self, packet):
self.handlePassthrough(self, packet)
def handlePassthrough(self, packet, finished=False):
handle = packet.getArgument(0)
job = self.jobs.get(handle)
if not job:
raise UnknownJobError()
job.connection.sendPacket(packet)
if finished:
del self.jobs[handle]
def handleSetClientID(self, packet):
name = packet.getArgument(0)
packet.connection.client_id = name
def handleCanDo(self, packet):
name = packet.getArgument(0)
self.log.debug("Adding function %s to %s" % (name, packet.connection))
packet.connection.functions.add(name)
def handleCantDo(self, packet):
name = packet.getArgument(0)
self.log.debug("Removing function %s from %s" %
(name, packet.connection))
packet.connection.functions.remove(name)
def handleResetAbilities(self, packet):
self.log.debug("Resetting functions for %s" % packet.connection)
packet.connection.functions = set()