backporting...

Change-Id: I894cfb152297dd175105d8f35023a090bc5d8bb5
This commit is contained in:
German Eichberger 2014-06-23 11:26:01 -07:00 committed by Jamie McCarthy
parent 43e8447bdd
commit 80e7bae30c
9 changed files with 387 additions and 384 deletions

View File

@ -16,14 +16,14 @@ import ipaddress
import threading
from datetime import datetime
from gearman.constants import JOB_UNKNOWN
from oslo.config import cfg
from sqlalchemy import func
from libra.common.api.lbaas import Device, PoolBuilding, Vip, db_session
from libra.common.api.lbaas import Counters
from libra.common.json_gearman import JsonJob
from libra.common.json_gearman import JSONGearmanClient
from libra.openstack.common import log
import gear
# TODO: Lots of duplication of code here, need to cleanup
@ -31,6 +31,7 @@ LOG = log.getLogger(__name__)
class Pool(object):
DELETE_SECONDS = cfg.CONF['admin_api'].delete_timer_seconds
PROBE_SECONDS = cfg.CONF['admin_api'].probe_timer_seconds
VIPS_SECONDS = cfg.CONF['admin_api'].vips_timer_seconds
@ -48,8 +49,6 @@ class Pool(object):
self.start_probe_sched()
self.start_vips_sched()
self.gear = GearmanWork() # set up the async gearman
def shutdown(self):
if self.probe_timer:
self.probe_timer.cancel()
@ -67,8 +66,9 @@ class Pool(object):
return
LOG.info('Running device delete check')
try:
message = []
with db_session() as session:
devices = session.query(Device). \
devices = session.query(Device).\
filter(Device.status == 'DELETED').all()
for device in devices:
@ -76,12 +76,17 @@ class Pool(object):
'action': 'DELETE_DEVICE',
'name': device.name
}
self.gear.send_delete_message(job_data)
message.append(dict(task='libra_pool_mgm', data=job_data))
counter = session.query(Counters). \
counter = session.query(Counters).\
filter(Counters.name == 'devices_deleted').first()
counter.value += len(devices)
session.commit()
if not message:
LOG.info("No devices to delete")
else:
gear = GearmanWork()
gear.send_delete_message(message)
except:
LOG.exception("Exception when deleting devices")
@ -97,7 +102,7 @@ class Pool(object):
try:
with db_session() as session:
NULL = None # For pep8
vip_count = session.query(Vip). \
vip_count = session.query(Vip).\
filter(Vip.device == NULL).count()
if vip_count >= self.vip_pool_size:
LOG.info("Enough vips exist, no work to do")
@ -123,11 +128,11 @@ class Pool(object):
try:
with db_session() as session:
# Double check we have no outstanding builds assigned to us
session.query(PoolBuilding). \
filter(PoolBuilding.server_id == self.server_id). \
session.query(PoolBuilding).\
filter(PoolBuilding.server_id == self.server_id).\
delete()
session.flush()
dev_count = session.query(Device). \
dev_count = session.query(Device).\
filter(Device.status == 'OFFLINE').count()
if dev_count >= self.node_pool_size:
LOG.info("Enough devices exist, no work to do")
@ -159,8 +164,8 @@ class Pool(object):
# for a long time locking tables
self._build_nodes(build_count)
with db_session() as session:
session.query(PoolBuilding). \
filter(PoolBuilding.server_id == self.server_id). \
session.query(PoolBuilding).\
filter(PoolBuilding.server_id == self.server_id).\
delete()
session.commit()
except:
@ -168,14 +173,24 @@ class Pool(object):
self.start_probe_sched()
def _build_nodes(self, count):
message = []
it = 0
job_data = {'action': 'BUILD_DEVICE'}
for it in range(0, count):
self.gear.send_create_message(job_data)
while it < count:
message.append(dict(task='libra_pool_mgm', data=job_data))
it += 1
gear = GearmanWork()
gear.send_create_message(message)
def _build_vips(self, count):
message = []
it = 0
job_data = {'action': 'BUILD_IP'}
for it in range(0, count):
self.gear.send_vips_message(job_data)
while it < count:
message.append(dict(task='libra_pool_mgm', data=job_data))
it += 1
gear = GearmanWork()
gear.send_vips_message(message)
def start_probe_sched(self):
seconds = datetime.now().second
@ -213,126 +228,160 @@ class Pool(object):
class GearmanWork(object):
class VIPClient(gear.Client):
def handleWorkComplete(self, packet):
job = super(GearmanWork.VIPClient, self).handleWorkComplete(packet)
def __init__(self):
server_list = []
for server in cfg.CONF['gearman']['servers']:
host, port = server.split(':')
server_list.append({'host': host,
'port': int(port),
'keyfile': cfg.CONF['gearman']['ssl_key'],
'certfile': cfg.CONF['gearman']['ssl_cert'],
'ca_certs': cfg.CONF['gearman']['ssl_ca'],
'keepalive': cfg.CONF['gearman']['keepalive'],
'keepcnt': cfg.CONF['gearman']['keepcnt'],
'keepidle': cfg.CONF['gearman']['keepidle'],
'keepintvl': cfg.CONF['gearman']['keepintvl']
})
self.gearman_client = JSONGearmanClient(server_list)
def send_delete_message(self, message):
LOG.info("Sending %d gearman messages", len(message))
job_status = self.gearman_client.submit_multiple_jobs(
message, background=False, wait_until_complete=True,
max_retries=10, poll_timeout=30.0
)
delete_count = 0
for status in job_status:
if status.state == JOB_UNKNOWN:
LOG.error('Gearman Job server fail')
continue
if status.timed_out:
LOG.error('Gearman timeout whilst deleting device')
continue
if status.result['response'] == 'FAIL':
LOG.error(
'Pool manager failed to delete a device, removing from DB'
)
delete_count += 1
with db_session() as session:
session.query(Device).\
filter(Device.name == status.result['name']).delete()
session.commit()
LOG.info('%d freed devices delete from pool', delete_count)
def send_vips_message(self, message):
# TODO: make this gearman part more async, not wait for all builds
LOG.info("Sending %d gearman messages", len(message))
job_status = self.gearman_client.submit_multiple_jobs(
message, background=False, wait_until_complete=True,
max_retries=10, poll_timeout=3600.0
)
built_count = 0
for status in job_status:
if status.state == JOB_UNKNOWN:
LOG.error('Gearman Job server fail')
continue
if status.timed_out:
LOG.error('Gearman timeout whilst building vip')
continue
if status.result['response'] == 'FAIL':
LOG.error('Pool manager failed to build a vip')
continue
built_count += 1
try:
if job.msg['response'] == 'FAIL':
LOG.error('Pool manager failed to build a vip')
else:
self._add_vip(job.msg)
self._add_vip(status.result)
except:
LOG.exception(
'Could not add vip to DB, node data: {0}'
.format(job.msg)
.format(status.result)
)
LOG.info(
'{vips} vips built and added to pool'.format(vips=built_count)
)
def _add_vip(self, data):
LOG.info('Adding vip {0} to DB'.format(data['ip']))
vip = Vip()
vip.ip = int(ipaddress.IPv4Address(unicode(data['ip'])))
with db_session() as session:
session.add(vip)
counter = session.query(Counters). \
filter(Counters.name == 'vips_built').first()
counter.value += 1
session.commit()
def send_create_message(self, message):
# TODO: make this gearman part more async, not wait for all builds
LOG.info("Sending {0} gearman messages".format(len(message)))
job_status = self.gearman_client.submit_multiple_jobs(
message, background=False, wait_until_complete=True,
max_retries=10, poll_timeout=3600.0
)
built_count = 0
for status in job_status:
if status.state == JOB_UNKNOWN:
LOG.error('Gearman Job server fail')
continue
if status.timed_out:
LOG.error('Gearman timeout whilst building device')
continue
if status.result['response'] == 'FAIL':
LOG.error('Pool manager failed to build a device')
if 'name' in status.result:
self._add_bad_node(status.result)
continue
class DeleteClient(gear.Client):
def handleWorkComplete(self, packet):
job = super(GearmanWork.DeleteClient,
self).handleWorkComplete(packet)
if job.msg['response'] == 'FAIL':
LOG.error(
'Pool manager failed to delete a device, removing from DB')
self._delete_from_db(job.msg)
def _delete_from_db(self, msg):
with db_session() as session:
session.query(Device). \
filter(Device.name == msg['name']).delete()
session.commit()
LOG.info("Delete device %s" % msg['name'])
class CreateClient(gear.Client):
def handleWorkComplete(self, packet):
job = super(GearmanWork.CreateClient,
self).handleWorkComplete(packet)
built_count += 1
try:
if job.msg['response'] == 'FAIL':
LOG.error('Pool manager failed to build a device')
if 'name' in job.msg:
self._add_bad_node(job.msg)
else:
self._add_node(job.msg)
self._add_node(status.result)
except:
LOG.exception(
'Could not add node to DB, node data: {0}'
.format(job.msg)
.format(status.result)
)
LOG.info(
'{nodes} devices built and added to pool'.format(nodes=built_count)
)
def _add_node(self, data):
LOG.info('Adding device {0} to DB'.format(data['name']))
device = Device()
device.name = data['name']
device.publicIpAddr = data['addr']
# TODO: kill this field, make things use publicIpAddr instead
device.floatingIpAddr = data['addr']
device.az = data['az']
device.type = data['type']
device.pingCount = 0
device.status = 'OFFLINE'
device.created = None
with db_session() as session:
session.add(device)
counter = session.query(Counters). \
filter(Counters.name == 'devices_built').first()
counter.value += 1
session.commit()
def _add_vip(self, data):
LOG.info('Adding vip {0} to DB'.format(data['ip']))
vip = Vip()
vip.ip = int(ipaddress.IPv4Address(unicode(data['ip'])))
with db_session() as session:
session.add(vip)
counter = session.query(Counters).\
filter(Counters.name == 'vips_built').first()
counter.value += 1
session.commit()
def _add_bad_node(self, data):
LOG.info(
"Adding bad device {0} to DB to be deleted" % (data['name']))
device = Device()
device.name = data['name']
device.publicIpAddr = data['addr']
# TODO: kill this field, make things use publicIpAddr instead
device.floatingIpAddr = data['addr']
device.az = data['az']
device.type = data['type']
device.pingCount = 0
device.status = 'DELETED'
device.created = None
with db_session() as session:
session.add(device)
counter = session.query(Counters). \
filter(Counters.name == 'devices_bad_built').first()
counter.value += 1
session.commit()
def _add_node(self, data):
LOG.info('Adding device {0} to DB'.format(data['name']))
device = Device()
device.name = data['name']
device.publicIpAddr = data['addr']
# TODO: kill this field, make things use publicIpAddr instead
device.floatingIpAddr = data['addr']
device.az = data['az']
device.type = data['type']
device.pingCount = 0
device.status = 'OFFLINE'
device.created = None
with db_session() as session:
session.add(device)
counter = session.query(Counters).\
filter(Counters.name == 'devices_built').first()
counter.value += 1
session.commit()
def __init__(self):
self.vip_client = GearmanWork.VIPClient("Vip Client")
self.delete_client = GearmanWork.DeleteClient("Delete Client")
self.create_client = GearmanWork.CreateClient("Create Client")
for x in [self.vip_client, self.create_client, self.delete_client]:
self._init_client(x)
def _init_client(self, client):
client.log = LOG
for server in cfg.CONF['gearman']['servers']:
host, port = server.split(':')
client.addServer(host, port, cfg.CONF['gearman']['ssl_key'],
cfg.CONF['gearman']['ssl_cert'],
cfg.CONF['gearman']['ssl_ca'])
def send_delete_message(self, message, name='libra_pool_mgm'):
self.delete_client.submitJob(JsonJob(name, message))
def send_vips_message(self, message, name='libra_pool_mgm'):
self.vip_client.submitJob(JsonJob(name, message))
def send_create_message(self, message, name='libra_pool_mgm'):
self.create_client.submitJob(JsonJob(name, message))
def _add_bad_node(self, data):
LOG.info(
'Adding bad device {0} to DB to be deleted'.format(data['name'])
)
device = Device()
device.name = data['name']
device.publicIpAddr = data['addr']
# TODO: kill this field, make things use publicIpAddr instead
device.floatingIpAddr = data['addr']
device.az = data['az']
device.type = data['type']
device.pingCount = 0
device.status = 'DELETED'
device.created = None
with db_session() as session:
session.add(device)
counter = session.query(Counters).\
filter(Counters.name == 'devices_bad_built').first()
counter.value += 1
session.commit()

View File

@ -37,7 +37,6 @@ class OfflineStats(object):
self.server_id = cfg.CONF['admin_api']['server_id']
self.number_of_servers = cfg.CONF['admin_api']['number_of_servers']
self.gearman = GearJobs()
self.start_offline_sched()
def shutdown(self):
@ -81,7 +80,8 @@ class OfflineStats(object):
return (0, 0)
for lb in devices:
node_list.append(lb.name)
failed_lbs = self.gearman.offline_check(node_list)
gearman = GearJobs()
failed_lbs = gearman.offline_check(node_list)
failed = len(failed_lbs)
if failed > self.error_limit:
LOG.error(

View File

@ -36,7 +36,6 @@ class PingStats(object):
self.stats_driver = cfg.CONF['admin_api']['stats_driver']
LOG.info("Selected stats drivers: %s", self.stats_driver)
self.gearman = GearJobs()
self.start_ping_sched()
def shutdown(self):
@ -76,7 +75,8 @@ class PingStats(object):
return (0, 0)
for lb in devices:
node_list.append(lb.name)
failed_lbs, node_status = self.gearman.send_pings(node_list)
gearman = GearJobs()
failed_lbs, node_status = gearman.send_pings(node_list)
failed = len(failed_lbs)
if failed > self.error_limit:
LOG.error(

View File

@ -12,238 +12,213 @@
# License for the specific language governing permissions and limitations
# under the License.
from gearman.constants import JOB_UNKNOWN
from oslo.config import cfg
from libra.common.json_gearman import JSONGearmanClient
from libra.openstack.common import log
import gear
from libra.common.json_gearman import JsonJob
import time
LOG = log.getLogger(__name__)
class GearJobs(object):
class DisconnectClient(gear.Client):
def handleDisconnect(self, job):
job.disconnect = True
class DisconnectJob(JsonJob):
def __init__(self, name, msg, unique=None):
super(GearJobs.DisconnectJob, self).__init__(name, msg, unique)
self.disconnect = False
def __init__(self):
self.poll_timeout = cfg.CONF['admin_api']['stats_poll_timeout']
self.poll_retry = cfg.CONF['admin_api']['stats_poll_timeout_retry']
self.gm_client = gear.Client("stats")
self.gm_client.log = LOG
server_list = []
for server in cfg.CONF['gearman']['servers']:
host, port = server.split(':')
self.gm_client.addServer(host, port,
cfg.CONF['gearman']['ssl_key'],
cfg.CONF['gearman']['ssl_cert'],
cfg.CONF['gearman']['ssl_ca'])
def _all_complete(self, jobs):
for job in jobs:
if not (job.complete or job.disconnect):
return False
return True
def _wait(self, pings):
poll_count = 0
while not self._all_complete(pings) and poll_count < self.poll_retry:
# wait for jobs
time.sleep(self.poll_timeout)
poll_count += 1
server_list.append({'host': host,
'port': int(port),
'keyfile': cfg.CONF['gearman']['ssl_key'],
'certfile': cfg.CONF['gearman']['ssl_cert'],
'ca_certs': cfg.CONF['gearman']['ssl_ca'],
'keepalive': cfg.CONF['gearman']['keepalive'],
'keepcnt': cfg.CONF['gearman']['keepcnt'],
'keepidle': cfg.CONF['gearman']['keepidle'],
'keepintvl': cfg.CONF['gearman']['keepintvl']
})
self.gm_client = JSONGearmanClient(server_list)
def send_pings(self, node_list):
# TODO: lots of duplicated code that needs cleanup
list_of_jobs = []
failed_list = []
node_status = dict()
retry_list = []
submitted_pings = []
# The message name is STATS for historical reasons. Real
# data statistics are gathered with METRICS messages.
job_data = {"hpcs_action": "STATS"}
for node in node_list:
job = GearJobs.DisconnectJob(str(node), job_data)
self.gm_client.submitJob(job)
submitted_pings.append(job)
self._wait(submitted_pings)
list_of_jobs.append(dict(task=str(node), data=job_data))
submitted_pings = self.gm_client.submit_multiple_jobs(
list_of_jobs, background=False, wait_until_complete=True,
poll_timeout=self.poll_timeout
)
for ping in submitted_pings:
if ping.disconnect:
if ping.state == JOB_UNKNOWN:
# TODO: Gearman server failed, ignoring for now
LOG.error('Gearman Job server fail')
continue
if not ping.complete:
if ping.timed_out:
# Ping timeout
retry_list.append(ping)
retry_list.append(ping.job.task)
continue
if ping.msg['hpcs_response'] == 'FAIL':
if ping.result['hpcs_response'] == 'FAIL':
if (
'status' in ping.result and
ping.msg['status'] == 'DELETED'
ping.result['status'] == 'DELETED'
):
continue
# Error returned by Gearman
failed_list.append(ping)
failed_list.append(ping.job.task)
continue
else:
if 'nodes' in ping.msg:
node_status[ping.name] = ping.msg['nodes']
if 'nodes' in ping.result:
node_status[ping.job.task] = ping.result['nodes']
submitted_pings = []
list_of_jobs = []
if len(retry_list) > 0:
LOG.info(
"{0} pings timed out, retrying".format(len(retry_list))
)
for node in retry_list:
job = GearJobs.DisconnectJob(node.name, node.msg)
self.gm_client.submitJob(job)
submitted_pings.append(job)
self._wait(submitted_pings)
list_of_jobs.append(dict(task=str(node), data=job_data))
submitted_pings = self.gm_client.submit_multiple_jobs(
list_of_jobs, background=False, wait_until_complete=True,
poll_timeout=self.poll_retry
)
for ping in submitted_pings:
if ping.disconnect:
if ping.state == JOB_UNKNOWN:
# TODO: Gearman server failed, ignoring for now
LOG.error('Gearman Job server fail')
continue
if not ping.complete:
if ping.timed_out:
# Ping timeout
failed_list.append(ping.name)
failed_list.append(ping.job.task)
continue
if ping.msg['hpcs_response'] == 'FAIL':
if ping.result['hpcs_response'] == 'FAIL':
if (
'status' in ping.msg and
ping.msg['status'] == 'DELETED'
'status' in ping.result and
ping.result['status'] == 'DELETED'
):
continue
# Error returned by Gearman
failed_list.append(ping.name)
failed_list.append(ping.job.task)
continue
else:
if 'nodes' in ping.result:
node_status[ping.name] = ping.msg['nodes']
node_status[ping.job.task] = ping.result['nodes']
return failed_list, node_status
def offline_check(self, node_list):
list_of_jobs = []
failed_list = []
submitted_pings = []
job_data = {"hpcs_action": "DIAGNOSTICS"}
for node in node_list:
job = GearJobs.DisconnectJob(str(node), job_data)
self.gm_client.submitJob(job)
submitted_pings.append(job)
self._wait(submitted_pings)
list_of_jobs.append(dict(task=str(node), data=job_data))
submitted_pings = self.gm_client.submit_multiple_jobs(
list_of_jobs, background=False, wait_until_complete=True,
poll_timeout=self.poll_timeout
)
for ping in submitted_pings:
if ping.disconnect:
if ping.state == JOB_UNKNOWN:
LOG.error(
"Gearman Job server failed during OFFLINE check of {0}".
format(ping.job.task)
)
elif not ping.complete:
failed_list.append(ping.name)
elif ping.msg['network'] == 'FAIL':
failed_list.append(ping.name)
elif ping.timed_out:
failed_list.append(ping.job.task)
elif ping.result['network'] == 'FAIL':
failed_list.append(ping.job.task)
else:
gearman_count = 0
gearman_fail = 0
for gearman_test in ping.msg['gearman']:
for gearman_test in ping.result['gearman']:
gearman_count += 1
if gearman_test['status'] == 'FAIL':
gearman_fail += 1
# Need 2/3rds gearman up
max_fail_count = gearman_count / 3
if gearman_fail > max_fail_count:
failed_list.append(ping.name)
failed_list.append(ping.job.task)
return failed_list
def get_discover(self, name):
# Used in the v2 devices controller
job_data = {"hpcs_action": "DISCOVER"}
job = GearJobs.DisconnectJob(str(name), job_data)
self.gm_client.submitJob(job, gear.PRECEDENCE_HIGH)
poll_count = 0
while not job.complete and not job.disconnect \
and poll_count < self.poll_retry:
# wait for jobs TODO make sure right unit/value
time.sleep(self.poll_timeout)
poll_count += 1
if not job.complete:
job = self.gm_client.submit_job(
str(name), job_data, background=False, wait_until_complete=True,
poll_timeout=10
)
if job.state == JOB_UNKNOWN:
# Gearman server failed
return None
if job.result['hpcs_response'] == 'FAIL':
elif job.timed_out:
# Time out is a fail
return None
elif job.result['hpcs_response'] == 'FAIL':
# Fail response is a fail
return None
return job.result
def get_stats(self, node_list):
# TODO: lots of duplicated code that needs cleanup
list_of_jobs = []
failed_list = []
retry_list = []
submitted_stats = []
results = {}
job_data = {"hpcs_action": "METRICS"}
for node in node_list:
job = GearJobs.DisconnectJob(str(node), job_data)
self.gm_client.submitJob(job)
submitted_stats.append(job)
self._wait(submitted_stats)
list_of_jobs.append(dict(task=str(node), data=job_data))
submitted_stats = self.gm_client.submit_multiple_jobs(
list_of_jobs, background=False, wait_until_complete=True,
poll_timeout=self.poll_timeout
)
for stats in submitted_stats:
if stats.disconnect:
if stats.state == JOB_UNKNOWN:
# TODO: Gearman server failed, ignoring for now
retry_list.append(stats)
elif not stats.complete:
retry_list.append(stats.job.task)
elif stats.timed_out:
# Timeout
retry_list.append(stats)
elif stats.msg['hpcs_response'] == 'FAIL':
retry_list.append(stats.job.task)
elif stats.result['hpcs_response'] == 'FAIL':
# Error returned by Gearman
failed_list.append(stats.name)
failed_list.append(stats.job.task)
else:
# Success
results[stats.name] = stats.msg
results[stats.job.task] = stats.result
submitted_stats = []
list_of_jobs = []
if len(retry_list) > 0:
LOG.info(
"{0} Statistics gathering timed out, retrying".
format(len(retry_list))
)
for node in retry_list:
job = GearJobs.DisconnectJob(node.name, node.msg)
self.gm_client.submitJob(job)
submitted_stats.append(job)
self._wait(submitted_stats)
list_of_jobs.append(dict(task=str(node), data=job_data))
submitted_stats = self.gm_client.submit_multiple_jobs(
list_of_jobs, background=False, wait_until_complete=True,
poll_timeout=self.poll_retry
)
for stats in submitted_stats:
if stats.disconnect:
if stats.state == JOB_UNKNOWN:
# TODO: Gearman server failed, ignoring for now
LOG.error(
"Gearman Job server failed gathering statistics "
"on {0}".format(stats.job.task)
)
failed_list.append(stats.name)
elif not stats.complete:
failed_list.append(stats.job.task)
elif stats.timed_out:
# Timeout
failed_list.append(stats.name)
elif stats.msg['hpcs_response'] == 'FAIL':
failed_list.append(stats.job.task)
elif stats.result['hpcs_response'] == 'FAIL':
# Error returned by Gearman
failed_list.append(stats.name)
failed_list.append(stats.job.task)
else:
# Success
results[stats.name] = stats.msg
results[stats.job.task] = stats.result
return failed_list, results

View File

@ -37,7 +37,6 @@ class UsageStats(object):
self.server_id = cfg.CONF['admin_api']['server_id']
self.number_of_servers = cfg.CONF['admin_api']['number_of_servers']
self.stats_freq = cfg.CONF['admin_api'].stats_freq
self.gearman = GearJobs()
self.start_stats_sched()
@ -102,7 +101,8 @@ class UsageStats(object):
for device in devices:
node_list.append(device.name)
failed_list, results = self.gearman.get_stats(node_list)
gearman = GearJobs()
failed_list, results = gearman.get_stats(node_list)
failed = len(failed_list)
if failed > 0:

View File

@ -13,22 +13,19 @@
# under the License.
import eventlet
import gear
import json
eventlet.monkey_patch()
import ipaddress
from libra.common.json_gearman import JSONGearmanClient
from libra.common.api.lbaas import LoadBalancer, db_session, Device, Node, Vip
from libra.common.api.lbaas import HealthMonitor, Counters
from libra.common.api.lbaas import loadbalancers_devices
from libra.common.api.mnb import update_mnb
from libra.openstack.common import log
from pecan import conf
from time import sleep
LOG = log.getLogger(__name__)
POLL_COUNT = 10
POLL_SLEEP = 10
gearman_workers = [
'UPDATE', # Create/Update a Load Balancer.
@ -42,17 +39,6 @@ gearman_workers = [
]
class DisconnectClient(gear.Client):
def handleDisconnect(self, job):
job.disconnect = True
class DisconnectJob(gear.Job):
def __init__(self, name, arguments):
super(DisconnectJob, self).__init__(name, arguments)
self.disconnect = False
def submit_job(job_type, host, data, lbid):
eventlet.spawn_n(client_job, job_type, str(host), data, lbid)
@ -136,15 +122,19 @@ class GearmanClientThread(object):
self.host = host
self.lbid = lbid
self.gear_client = DisconnectClient()
server_list = []
for server in conf.gearman.server:
ghost, gport = server.split(':')
self.gear_client.addServer(ghost,
int(gport),
conf.gearman.ssl_key,
conf.gearman.ssl_cert,
conf.gearman.ssl_ca)
server_list.append({'host': ghost,
'port': int(gport),
'keyfile': conf.gearman.ssl_key,
'certfile': conf.gearman.ssl_cert,
'ca_certs': conf.gearman.ssl_ca,
'keepalive': conf.gearman.keepalive,
'keepcnt': conf.gearman.keepcnt,
'keepidle': conf.gearman.keepidle,
'keepintvl': conf.gearman.keepintvl})
self.gearman_client = JSONGearmanClient(server_list)
def send_assign(self, data):
NULL = None # For pep8
@ -532,40 +522,28 @@ class GearmanClientThread(object):
mnb_data["tenantid"])
def _send_message(self, message, response_name):
self.gear_client.waitForServer()
job = DisconnectJob(self.host, json.dumps(message))
self.gear_client.submitJob(job)
pollcount = 0
# Would like to make these config file settings
while not job.complete and pollcount < POLL_COUNT:
sleep(POLL_SLEEP)
pollcount += 1
if job.disconnect:
LOG.error('Gearman Job server fail - disconnect')
return False, "Gearman Job server fail - "\
"disconnect communicating with load balancer"
# We timed out waiting for the job to finish
if not job.complete:
LOG.warning('Gearman timeout talking to {0}'.format(self.host))
job_status = self.gearman_client.submit_job(
self.host, message, background=False, wait_until_complete=True,
max_retries=10, poll_timeout=120.0
)
if job_status.state == 'UNKNOWN':
# Gearman server connection failed
LOG.error('Could not talk to gearman server')
return False, "System error communicating with load balancer"
if job_status.timed_out:
# Job timed out
LOG.warning(
'Gearman timeout talking to {0}'.format(self.host)
)
return False, "Timeout error communicating with load balancer"
result = json.loads(job.data[0])
LOG.debug(result)
if 'badRequest' in result:
error = result['badRequest']['validationErrors']
LOG.debug(job_status.result)
if 'badRequest' in job_status.result:
error = job_status.result['badRequest']['validationErrors']
return False, error['message']
if result[response_name] == 'FAIL':
if job_status.result[response_name] == 'FAIL':
# Worker says 'no'
if 'hpcs_error' in result:
error = result['hpcs_error']
if 'hpcs_error' in job_status.result:
error = job_status.result['hpcs_error']
else:
error = 'Load Balancer error'
LOG.error(
@ -573,4 +551,4 @@ class GearmanClientThread(object):
)
return False, error
LOG.info('Gearman success from {0}'.format(self.host))
return True, result
return True, job_status.result

View File

@ -13,14 +13,28 @@
# under the License.
import json
import gear
from gearman import GearmanClient, GearmanWorker, DataEncoder
# Here is the good stuff
class JsonJob(gear.Job):
def __init__(self, name, msg, unique=None):
super(JsonJob, self).__init__(name, json.dumps(msg), unique)
@property
def msg(self):
if self.data:
return json.loads(self.data[0])
class JSONDataEncoder(DataEncoder):
""" Class to transform data that the worker either receives or sends. """
@classmethod
def encode(cls, encodable_object):
""" Encode JSON object as string """
return json.dumps(encodable_object)
@classmethod
def decode(cls, decodable_string):
""" Decode string to JSON object """
return json.loads(decodable_string)
class JSONGearmanWorker(GearmanWorker):
""" Overload the Gearman worker class so we can set the data encoder. """
data_encoder = JSONDataEncoder
class JSONGearmanClient(GearmanClient):
""" Overload the Gearman client class so we can set the data encoder. """
data_encoder = JSONDataEncoder

View File

@ -12,32 +12,20 @@
# License for the specific language governing permissions and limitations
# under the License.
import gear
import json
from time import sleep
from novaclient import exceptions
from oslo.config import cfg
from gearman.constants import JOB_UNKNOWN
from libra.openstack.common import log
from libra.common.json_gearman import JSONGearmanClient
from libra.mgm.nova import Node, BuildError, NotFound
POLL_COUNT = 10
LOG = log.getLogger(__name__)
class DisconnectClient(gear.Client):
def handleDisconnect(self, job):
job.disconnect = True
class DisconnectJob(gear.Job):
def __init__(self, name, arguments):
super(DisconnectJob, self).__init__(name, arguments)
self.disconnect = False
class BuildController(object):
RESPONSE_FIELD = 'response'
RESPONSE_SUCCESS = 'PASS'
RESPONSE_FAILURE = 'FAIL'
@ -108,7 +96,7 @@ class BuildController(object):
)
self.msg[self.RESPONSE_FIELD] = self.RESPONSE_FAILURE
return self.msg
if resp.status_code not in (200, 203):
if resp.status_code not in(200, 203):
LOG.error(
'Error geting status from Nova, error {0}'
.format(resp.status_code)
@ -141,56 +129,44 @@ class BuildController(object):
def _test_node(self, name):
""" Run diags on node, blow it away if bad """
client = DisconnectClient()
server_list = []
for server in cfg.CONF['gearman']['servers']:
host, port = server.split(':')
client.addServer(host,
int(port),
cfg.CONF['gearman']['ssl_key'],
cfg.CONF['gearman']['ssl_cert'],
cfg.CONF['gearman']['ssl_ca'])
client.waitForServer()
server_list.append({'host': host,
'port': int(port),
'keyfile': cfg.CONF['gearman']['ssl_key'],
'certfile': cfg.CONF['gearman']['ssl_cert'],
'ca_certs': cfg.CONF['gearman']['ssl_ca'],
'keepalive': cfg.CONF['gearman']['keepalive'],
'keepcnt': cfg.CONF['gearman']['keepcnt'],
'keepidle': cfg.CONF['gearman']['keepidle'],
'keepintvl': cfg.CONF['gearman']['keepintvl']})
gm_client = JSONGearmanClient(server_list)
job_data = {'hpcs_action': 'DIAGNOSTICS'}
job = DisconnectJob(str(name), json.dumps(job_data))
client.submitJob(job)
pollcount = 0
pollsleepinterval = cfg.CONF['mgm']['build_diag_timeout'] / POLL_COUNT
while not job.complete\
and pollcount < POLL_COUNT\
and not job.disconnect:
sleep(pollsleepinterval)
pollcount += 1
if job.disconnect:
LOG.error('Gearman Job server fail - disconnect')
job_status = gm_client.submit_job(
str(name), job_data, background=False, wait_until_complete=True,
max_retries=10, poll_timeout=10
)
if job_status.state == JOB_UNKNOWN:
# Gearman server connect fail, count as bad node because we can't
# tell if it really is working
LOG.error('Could not talk to gearman server')
return False
# We timed out waiting for the job to finish
if not job.complete:
if job_status.timed_out:
LOG.warning('Timeout getting diags from {0}'.format(name))
return False
result = json.loads(job.data[0])
LOG.debug(result)
LOG.debug(job_status.result)
# Would only happen if DIAGNOSTICS call not supported
if result['hpcs_response'] == 'FAIL':
if job_status.result['hpcs_response'] == 'FAIL':
return True
if result['network'] == 'FAIL':
if job_status.result['network'] == 'FAIL':
return False
gearman_count = 0
gearman_fail = 0
for gearman_test in result['gearman']:
for gearman_test in job_status.result['gearman']:
gearman_count += 1
if gearman_test['status'] == 'FAIL':
LOG.info(

View File

@ -12,11 +12,14 @@
# License for the specific language governing permissions and limitations
# under the License.
import gear
import gearman.errors
import json
import socket
import time
from oslo.config import cfg
from libra.common.json_gearman import JSONGearmanWorker
from libra.mgm.controllers.root import PoolMgmController
from libra.openstack.common import log
@ -24,41 +27,49 @@ from libra.openstack.common import log
LOG = log.getLogger(__name__)
def handler(job):
LOG.debug("Received JSON message: {0}".format(json.dumps(job.arguments)))
controller = PoolMgmController(json.loads(job.arguments))
def handler(worker, job):
LOG.debug("Received JSON message: {0}".format(json.dumps(job.data)))
controller = PoolMgmController(job.data)
response = controller.run()
LOG.debug("Return JSON message: {0}".format(json.dumps(response)))
job.sendWorkComplete(json.dumps(response))
return response
def worker_thread():
LOG.info("Registering task libra_pool_mgm")
hostname = socket.gethostname()
worker = gear.Worker(hostname)
server_list = []
for host_port in cfg.CONF['gearman']['servers']:
host, port = host_port.split(':')
worker.addServer(host,
int(port),
cfg.CONF['gearman']['ssl_key'],
cfg.CONF['gearman']['ssl_cert'],
cfg.CONF['gearman']['ssl_ca'])
worker.registerFunction('libra_pool_mgm')
server_list.append({'host': host,
'port': int(port),
'keyfile': cfg.CONF['gearman']['ssl_key'],
'certfile': cfg.CONF['gearman']['ssl_cert'],
'ca_certs': cfg.CONF['gearman']['ssl_ca'],
'keepalive': cfg.CONF['gearman']['keepalive'],
'keepcnt': cfg.CONF['gearman']['keepcnt'],
'keepidle': cfg.CONF['gearman']['keepidle'],
'keepintvl': cfg.CONF['gearman']['keepintvl']})
worker = JSONGearmanWorker(server_list)
worker.set_client_id(hostname)
worker.register_task('libra_pool_mgm', handler)
worker.logger = LOG
retry = True
while retry:
while (retry):
try:
job = worker.getJob()
handler(job)
worker.work(cfg.CONF['gearman']['poll'])
except KeyboardInterrupt:
retry = False
except Exception as e:
LOG.exception("Exception in pool manager worker: %s, %s"
% (e.__class__, e))
except gearman.errors.ServerUnavailable:
LOG.error("Job server(s) went away. Reconnecting.")
time.sleep(cfg.CONF['gearman']['reconnect_sleep'])
retry = True
except Exception:
LOG.exception("Exception in worker")
retry = False
LOG.debug("Pool manager process terminated.")