min switch gearman to gear worker

Change-Id: I0dad4364f15a81ee17d4a1e1aa6585364b14a439
This commit is contained in:
Min Wang 2014-05-28 16:01:49 -07:00
parent 2300c8952a
commit 05f491043b
2 changed files with 16 additions and 35 deletions

View File

@ -12,14 +12,10 @@
# License for the specific language governing permissions and limitations
# under the License.
import gearman.errors
import json
import socket
import time
import gear
from oslo.config import cfg
from libra.common.json_gearman import JSONGearmanWorker
from libra.worker.controller import LBaaSController
from libra.openstack.common import log
@ -27,7 +23,7 @@ from libra.openstack.common import log
LOG = log.getLogger(__name__)
class CustomJSONGearmanWorker(JSONGearmanWorker):
class CustomJSONGearmanWorker(gear.Worker):
""" Custom class we will use to pass arguments to the Gearman task. """
driver = None
@ -43,13 +39,13 @@ def handler(worker, job):
driver = worker.driver
# Hide information that should not be logged
copy = job.data.copy()
copy = json.loads(job.arguments)
if LBaaSController.OBJ_STORE_TOKEN_FIELD in copy:
copy[LBaaSController.OBJ_STORE_TOKEN_FIELD] = "*****"
LOG.debug("Received JSON message: %s" % json.dumps(copy))
controller = LBaaSController(driver, job.data)
controller = LBaaSController(driver, json.loads(job.arguments))
response = controller.run()
# Hide information that should not be logged
@ -58,7 +54,7 @@ def handler(worker, job):
copy[LBaaSController.OBJ_STORE_TOKEN_FIELD] = "*****"
LOG.debug("Return JSON message: %s" % json.dumps(copy))
return copy
job.sendWorkComplete(json.dumps(copy))
def config_thread(driver):
@ -66,39 +62,23 @@ def config_thread(driver):
# Hostname should be a unique value, like UUID
hostname = socket.gethostname()
LOG.info("Registering task %s" % hostname)
server_list = []
worker = CustomJSONGearmanWorker(hostname)
for host_port in cfg.CONF['gearman']['servers']:
host, port = host_port.split(':')
server_list.append({'host': host,
'port': int(port),
'keyfile': cfg.CONF['gearman']['ssl_key'],
'certfile': cfg.CONF['gearman']['ssl_cert'],
'ca_certs': cfg.CONF['gearman']['ssl_ca'],
'keepalive': cfg.CONF['gearman']['keepalive'],
'keepcnt': cfg.CONF['gearman']['keepcnt'],
'keepidle': cfg.CONF['gearman']['keepidle'],
'keepintvl': cfg.CONF['gearman']['keepintvl']})
worker = CustomJSONGearmanWorker(server_list)
worker.set_client_id(hostname)
worker.register_task(hostname, handler)
worker.logger = LOG
worker.addServer(host, port, cfg.CONF['gearman']['ssl_key'],
cfg.CONF['gearman']['ssl_cert'],
cfg.CONF['gearman']['ssl_ca'])
worker.registerFunction(hostname)
worker.log = LOG
worker.driver = driver
retry = True
while (retry):
while retry:
try:
worker.work(cfg.CONF['gearman']['poll'])
job = worker.getJob()
handler(worker, job)
except KeyboardInterrupt:
retry = False
except gearman.errors.ServerUnavailable:
LOG.error("Job server(s) went away. Reconnecting.")
time.sleep(cfg.CONF['gearman']['reconnect_sleep'])
retry = True
except Exception as e:
LOG.critical("Exception: %s, %s" % (e.__class__, e))
retry = False
LOG.debug("Worker process terminated.")

View File

@ -2,6 +2,7 @@ pbr>=0.5.21,<1.0
Babel>=0.9.6
eventlet
gear
gearman>=2.0.2
oslo.config>=1.2.0
python-daemon>=1.6
@ -14,5 +15,5 @@ sqlalchemy>=0.8.0
wsme>=0.5b2
mysql-connector-python
ipaddress==1.0.4
six<1.4.0
six
kombu