changes the admin_api server to use the gear library

Change-Id: I0a9d28665d1ba63eed570fba96517b89f2a2bdde
This commit is contained in:
German Eichberger 2014-06-04 09:36:06 -07:00
parent 90de590e5e
commit 771f6bd76d
10 changed files with 3553 additions and 274 deletions

View File

@ -16,14 +16,14 @@ import ipaddress
import threading
from datetime import datetime
from gearman.constants import JOB_UNKNOWN
from oslo.config import cfg
from sqlalchemy import func
from libra.common.api.lbaas import Device, PoolBuilding, Vip, db_session
from libra.common.api.lbaas import Counters
from libra.common.json_gearman import JSONGearmanClient
from libra.common.json_gearman import JsonJob
from libra.openstack.common import log
import gear
# TODO: Lots of duplication of code here, need to cleanup
@ -31,7 +31,6 @@ LOG = log.getLogger(__name__)
class Pool(object):
DELETE_SECONDS = cfg.CONF['admin_api'].delete_timer_seconds
PROBE_SECONDS = cfg.CONF['admin_api'].probe_timer_seconds
VIPS_SECONDS = cfg.CONF['admin_api'].vips_timer_seconds
@ -49,6 +48,8 @@ class Pool(object):
self.start_probe_sched()
self.start_vips_sched()
self.gear = GearmanWork() # set up the async gearman
def shutdown(self):
if self.probe_timer:
self.probe_timer.cancel()
@ -66,9 +67,8 @@ class Pool(object):
return
LOG.info('Running device delete check')
try:
message = []
with db_session() as session:
devices = session.query(Device).\
devices = session.query(Device). \
filter(Device.status == 'DELETED').all()
for device in devices:
@ -76,17 +76,12 @@ class Pool(object):
'action': 'DELETE_DEVICE',
'name': device.name
}
message.append(dict(task='libra_pool_mgm', data=job_data))
self.gear.send_delete_message(job_data)
counter = session.query(Counters).\
counter = session.query(Counters). \
filter(Counters.name == 'devices_deleted').first()
counter.value += len(devices)
session.commit()
if not message:
LOG.info("No devices to delete")
else:
gear = GearmanWork()
gear.send_delete_message(message)
except:
LOG.exception("Exception when deleting devices")
@ -102,7 +97,7 @@ class Pool(object):
try:
with db_session() as session:
NULL = None # For pep8
vip_count = session.query(Vip).\
vip_count = session.query(Vip). \
filter(Vip.device == NULL).count()
if vip_count >= self.vip_pool_size:
LOG.info("Enough vips exist, no work to do")
@ -128,11 +123,11 @@ class Pool(object):
try:
with db_session() as session:
# Double check we have no outstanding builds assigned to us
session.query(PoolBuilding).\
filter(PoolBuilding.server_id == self.server_id).\
session.query(PoolBuilding). \
filter(PoolBuilding.server_id == self.server_id). \
delete()
session.flush()
dev_count = session.query(Device).\
dev_count = session.query(Device). \
filter(Device.status == 'OFFLINE').count()
if dev_count >= self.node_pool_size:
LOG.info("Enough devices exist, no work to do")
@ -164,8 +159,8 @@ class Pool(object):
# for a long time locking tables
self._build_nodes(build_count)
with db_session() as session:
session.query(PoolBuilding).\
filter(PoolBuilding.server_id == self.server_id).\
session.query(PoolBuilding). \
filter(PoolBuilding.server_id == self.server_id). \
delete()
session.commit()
except:
@ -173,24 +168,14 @@ class Pool(object):
self.start_probe_sched()
def _build_nodes(self, count):
message = []
it = 0
job_data = {'action': 'BUILD_DEVICE'}
while it < count:
message.append(dict(task='libra_pool_mgm', data=job_data))
it += 1
gear = GearmanWork()
gear.send_create_message(message)
for it in range(0, count):
self.gear.send_create_message(job_data)
def _build_vips(self, count):
message = []
it = 0
job_data = {'action': 'BUILD_IP'}
while it < count:
message.append(dict(task='libra_pool_mgm', data=job_data))
it += 1
gear = GearmanWork()
gear.send_vips_message(message)
for it in range(0, count):
self.gear.send_vips_message(job_data)
def start_probe_sched(self):
seconds = datetime.now().second
@ -228,160 +213,126 @@ class Pool(object):
class GearmanWork(object):
def __init__(self):
server_list = []
for server in cfg.CONF['gearman']['servers']:
host, port = server.split(':')
server_list.append({'host': host,
'port': int(port),
'keyfile': cfg.CONF['gearman']['ssl_key'],
'certfile': cfg.CONF['gearman']['ssl_cert'],
'ca_certs': cfg.CONF['gearman']['ssl_ca'],
'keepalive': cfg.CONF['gearman']['keepalive'],
'keepcnt': cfg.CONF['gearman']['keepcnt'],
'keepidle': cfg.CONF['gearman']['keepidle'],
'keepintvl': cfg.CONF['gearman']['keepintvl']
})
self.gearman_client = JSONGearmanClient(server_list)
def send_delete_message(self, message):
LOG.info("Sending %d gearman messages", len(message))
job_status = self.gearman_client.submit_multiple_jobs(
message, background=False, wait_until_complete=True,
max_retries=10, poll_timeout=30.0
)
delete_count = 0
for status in job_status:
if status.state == JOB_UNKNOWN:
LOG.error('Gearman Job server fail')
continue
if status.timed_out:
LOG.error('Gearman timeout whilst deleting device')
continue
if status.result['response'] == 'FAIL':
LOG.error(
'Pool manager failed to delete a device, removing from DB'
)
delete_count += 1
with db_session() as session:
session.query(Device).\
filter(Device.name == status.result['name']).delete()
session.commit()
LOG.info('%d freed devices delete from pool', delete_count)
def send_vips_message(self, message):
# TODO: make this gearman part more async, not wait for all builds
LOG.info("Sending %d gearman messages", len(message))
job_status = self.gearman_client.submit_multiple_jobs(
message, background=False, wait_until_complete=True,
max_retries=10, poll_timeout=3600.0
)
built_count = 0
for status in job_status:
if status.state == JOB_UNKNOWN:
LOG.error('Gearman Job server fail')
continue
if status.timed_out:
LOG.error('Gearman timeout whilst building vip')
continue
if status.result['response'] == 'FAIL':
LOG.error('Pool manager failed to build a vip')
continue
built_count += 1
class VIPClient(gear.Client):
def handleWorkComplete(self, packet):
job = super(GearmanWork.VIPClient, self).handleWorkComplete(packet)
try:
self._add_vip(status.result)
if job.msg['response'] == 'FAIL':
LOG.error('Pool manager failed to build a vip')
else:
self._add_vip(job.msg)
except:
LOG.exception(
'Could not add vip to DB, node data: {0}'
.format(status.result)
.format(job.msg)
)
LOG.info(
'{vips} vips built and added to pool'.format(vips=built_count)
)
def send_create_message(self, message):
# TODO: make this gearman part more async, not wait for all builds
LOG.info("Sending {0} gearman messages".format(len(message)))
job_status = self.gearman_client.submit_multiple_jobs(
message, background=False, wait_until_complete=True,
max_retries=10, poll_timeout=3600.0
)
built_count = 0
for status in job_status:
if status.state == JOB_UNKNOWN:
LOG.error('Gearman Job server fail')
continue
if status.timed_out:
LOG.error('Gearman timeout whilst building device')
continue
if status.result['response'] == 'FAIL':
LOG.error('Pool manager failed to build a device')
if 'name' in status.result:
self._add_bad_node(status.result)
continue
def _add_vip(self, data):
LOG.info('Adding vip {0} to DB'.format(data['ip']))
vip = Vip()
vip.ip = int(ipaddress.IPv4Address(unicode(data['ip'])))
with db_session() as session:
session.add(vip)
counter = session.query(Counters). \
filter(Counters.name == 'vips_built').first()
counter.value += 1
session.commit()
built_count += 1
class DeleteClient(gear.Client):
def handleWorkComplete(self, packet):
job = super(GearmanWork.DeleteClient,
self).handleWorkComplete(packet)
if job.msg['response'] == 'FAIL':
LOG.error(
'Pool manager failed to delete a device, removing from DB')
self._delete_from_db(job.msg)
def _delete_from_db(self, msg):
with db_session() as session:
session.query(Device). \
filter(Device.name == msg['name']).delete()
session.commit()
LOG.info("Delete device %s" % msg['name'])
class CreateClient(gear.Client):
def handleWorkComplete(self, packet):
job = super(GearmanWork.CreateClient,
self).handleWorkComplete(packet)
try:
self._add_node(status.result)
if job.msg['response'] == 'FAIL':
LOG.error('Pool manager failed to build a device')
if 'name' in job.msg:
self._add_bad_node(job.msg)
else:
self._add_node(job.msg)
except:
LOG.exception(
'Could not add node to DB, node data: {0}'
.format(status.result)
.format(job.msg)
)
LOG.info(
'{nodes} devices built and added to pool'.format(nodes=built_count)
)
def _add_vip(self, data):
LOG.info('Adding vip {0} to DB'.format(data['ip']))
vip = Vip()
vip.ip = int(ipaddress.IPv4Address(unicode(data['ip'])))
with db_session() as session:
session.add(vip)
counter = session.query(Counters).\
filter(Counters.name == 'vips_built').first()
counter.value += 1
session.commit()
def _add_node(self, data):
LOG.info('Adding device {0} to DB'.format(data['name']))
device = Device()
device.name = data['name']
device.publicIpAddr = data['addr']
# TODO: kill this field, make things use publicIpAddr instead
device.floatingIpAddr = data['addr']
device.az = data['az']
device.type = data['type']
device.pingCount = 0
device.status = 'OFFLINE'
device.created = None
with db_session() as session:
session.add(device)
counter = session.query(Counters). \
filter(Counters.name == 'devices_built').first()
counter.value += 1
session.commit()
def _add_node(self, data):
LOG.info('Adding device {0} to DB'.format(data['name']))
device = Device()
device.name = data['name']
device.publicIpAddr = data['addr']
# TODO: kill this field, make things use publicIpAddr instead
device.floatingIpAddr = data['addr']
device.az = data['az']
device.type = data['type']
device.pingCount = 0
device.status = 'OFFLINE'
device.created = None
with db_session() as session:
session.add(device)
counter = session.query(Counters).\
filter(Counters.name == 'devices_built').first()
counter.value += 1
session.commit()
def _add_bad_node(self, data):
LOG.info(
"Adding bad device {0} to DB to be deleted" % (data['name']))
device = Device()
device.name = data['name']
device.publicIpAddr = data['addr']
# TODO: kill this field, make things use publicIpAddr instead
device.floatingIpAddr = data['addr']
device.az = data['az']
device.type = data['type']
device.pingCount = 0
device.status = 'DELETED'
device.created = None
with db_session() as session:
session.add(device)
counter = session.query(Counters). \
filter(Counters.name == 'devices_bad_built').first()
counter.value += 1
session.commit()
def _add_bad_node(self, data):
LOG.info(
'Adding bad device {0} to DB to be deleted'.format(data['name'])
)
device = Device()
device.name = data['name']
device.publicIpAddr = data['addr']
# TODO: kill this field, make things use publicIpAddr instead
device.floatingIpAddr = data['addr']
device.az = data['az']
device.type = data['type']
device.pingCount = 0
device.status = 'DELETED'
device.created = None
with db_session() as session:
session.add(device)
counter = session.query(Counters).\
filter(Counters.name == 'devices_bad_built').first()
counter.value += 1
session.commit()
def __init__(self):
self.vip_client = GearmanWork.VIPClient("Vip Client")
self.delete_client = GearmanWork.DeleteClient("Delete Client")
self.create_client = GearmanWork.CreateClient("Create Client")
for x in [self.vip_client, self.create_client, self.delete_client]:
self._init_client(x)
def _init_client(self, client):
client.log = LOG
for server in cfg.CONF['gearman']['servers']:
host, port = server.split(':')
client.addServer(host, port, cfg.CONF['gearman']['ssl_key'],
cfg.CONF['gearman']['ssl_cert'],
cfg.CONF['gearman']['ssl_ca'])
def send_delete_message(self, message, name='libra_pool_mgm'):
self.delete_client.submitJob(JsonJob(name, message))
def send_vips_message(self, message, name='libra_pool_mgm'):
self.vip_client.submitJob(JsonJob(name, message))
def send_create_message(self, message, name='libra_pool_mgm'):
self.create_client.submitJob(JsonJob(name, message))

View File

@ -37,6 +37,7 @@ class OfflineStats(object):
self.server_id = cfg.CONF['admin_api']['server_id']
self.number_of_servers = cfg.CONF['admin_api']['number_of_servers']
self.gearman = GearJobs()
self.start_offline_sched()
def shutdown(self):
@ -80,8 +81,7 @@ class OfflineStats(object):
return (0, 0)
for lb in devices:
node_list.append(lb.name)
gearman = GearJobs()
failed_lbs = gearman.offline_check(node_list)
failed_lbs = self.gearman.offline_check(node_list)
failed = len(failed_lbs)
if failed > self.error_limit:
LOG.error(

View File

@ -36,6 +36,7 @@ class PingStats(object):
self.stats_driver = cfg.CONF['admin_api']['stats_driver']
LOG.info("Selected stats drivers: %s", self.stats_driver)
self.gearman = GearJobs()
self.start_ping_sched()
def shutdown(self):
@ -75,8 +76,7 @@ class PingStats(object):
return (0, 0)
for lb in devices:
node_list.append(lb.name)
gearman = GearJobs()
failed_lbs, node_status = gearman.send_pings(node_list)
failed_lbs, node_status = self.gearman.send_pings(node_list)
failed = len(failed_lbs)
if failed > self.error_limit:
LOG.error(

View File

@ -12,213 +12,238 @@
# License for the specific language governing permissions and limitations
# under the License.
from gearman.constants import JOB_UNKNOWN
from oslo.config import cfg
from libra.common.json_gearman import JSONGearmanClient
from libra.openstack.common import log
import gear
from libra.common.json_gearman import JsonJob
import time
LOG = log.getLogger(__name__)
class GearJobs(object):
class DisconnectClient(gear.Client):
def handleDisconnect(self, job):
job.disconnect = True
class DisconnectJob(JsonJob):
def __init__(self, name, msg, unique=None):
super(GearJobs.DisconnectJob, self).__init__(name, msg, unique)
self.disconnect = False
def __init__(self):
self.poll_timeout = cfg.CONF['admin_api']['stats_poll_timeout']
self.poll_retry = cfg.CONF['admin_api']['stats_poll_timeout_retry']
server_list = []
self.gm_client = gear.Client("stats")
self.gm_client.log = LOG
for server in cfg.CONF['gearman']['servers']:
host, port = server.split(':')
server_list.append({'host': host,
'port': int(port),
'keyfile': cfg.CONF['gearman']['ssl_key'],
'certfile': cfg.CONF['gearman']['ssl_cert'],
'ca_certs': cfg.CONF['gearman']['ssl_ca'],
'keepalive': cfg.CONF['gearman']['keepalive'],
'keepcnt': cfg.CONF['gearman']['keepcnt'],
'keepidle': cfg.CONF['gearman']['keepidle'],
'keepintvl': cfg.CONF['gearman']['keepintvl']
})
self.gm_client = JSONGearmanClient(server_list)
self.gm_client.addServer(host, port,
cfg.CONF['gearman']['ssl_key'],
cfg.CONF['gearman']['ssl_cert'],
cfg.CONF['gearman']['ssl_ca'])
def _all_complete(self, jobs):
for job in jobs:
if not (job.complete or job.disconnect):
return False
return True
def _wait(self, pings):
poll_count = 0
while not self._all_complete(pings) and poll_count < self.poll_retry:
# wait for jobs
time.sleep(self.poll_timeout)
poll_count += 1
def send_pings(self, node_list):
# TODO: lots of duplicated code that needs cleanup
list_of_jobs = []
failed_list = []
node_status = dict()
retry_list = []
submitted_pings = []
# The message name is STATS for historical reasons. Real
# data statistics are gathered with METRICS messages.
job_data = {"hpcs_action": "STATS"}
for node in node_list:
list_of_jobs.append(dict(task=str(node), data=job_data))
submitted_pings = self.gm_client.submit_multiple_jobs(
list_of_jobs, background=False, wait_until_complete=True,
poll_timeout=self.poll_timeout
)
job = GearJobs.DisconnectJob(str(node), job_data)
self.gm_client.submitJob(job)
submitted_pings.append(job)
self._wait(submitted_pings)
for ping in submitted_pings:
if ping.state == JOB_UNKNOWN:
if ping.disconnect:
# TODO: Gearman server failed, ignoring for now
LOG.error('Gearman Job server fail')
continue
if ping.timed_out:
if not ping.complete:
# Ping timeout
retry_list.append(ping.job.task)
retry_list.append(ping)
continue
if ping.result['hpcs_response'] == 'FAIL':
if ping.msg['hpcs_response'] == 'FAIL':
if (
'status' in ping.result and
ping.result['status'] == 'DELETED'
ping.msg['status'] == 'DELETED'
):
continue
# Error returned by Gearman
failed_list.append(ping.job.task)
failed_list.append(ping)
continue
else:
if 'nodes' in ping.result:
node_status[ping.job.task] = ping.result['nodes']
if 'nodes' in ping.msg:
node_status[ping.name] = ping.msg['nodes']
list_of_jobs = []
submitted_pings = []
if len(retry_list) > 0:
LOG.info(
"{0} pings timed out, retrying".format(len(retry_list))
)
for node in retry_list:
list_of_jobs.append(dict(task=str(node), data=job_data))
submitted_pings = self.gm_client.submit_multiple_jobs(
list_of_jobs, background=False, wait_until_complete=True,
poll_timeout=self.poll_retry
)
job = GearJobs.DisconnectJob(node.name, node.msg)
self.gm_client.submitJob(job)
submitted_pings.append(job)
self._wait(submitted_pings)
for ping in submitted_pings:
if ping.state == JOB_UNKNOWN:
if ping.disconnect:
# TODO: Gearman server failed, ignoring for now
LOG.error('Gearman Job server fail')
continue
if ping.timed_out:
if not ping.complete:
# Ping timeout
failed_list.append(ping.job.task)
failed_list.append(ping.name)
continue
if ping.result['hpcs_response'] == 'FAIL':
if ping.msg['hpcs_response'] == 'FAIL':
if (
'status' in ping.result and
ping.result['status'] == 'DELETED'
'status' in ping.msg and
ping.msg['status'] == 'DELETED'
):
continue
# Error returned by Gearman
failed_list.append(ping.job.task)
failed_list.append(ping.name)
continue
else:
if 'nodes' in ping.result:
node_status[ping.job.task] = ping.result['nodes']
node_status[ping.name] = ping.msg['nodes']
return failed_list, node_status
def offline_check(self, node_list):
list_of_jobs = []
failed_list = []
submitted_pings = []
job_data = {"hpcs_action": "DIAGNOSTICS"}
for node in node_list:
list_of_jobs.append(dict(task=str(node), data=job_data))
submitted_pings = self.gm_client.submit_multiple_jobs(
list_of_jobs, background=False, wait_until_complete=True,
poll_timeout=self.poll_timeout
)
job = GearJobs.DisconnectJob(str(node), job_data)
self.gm_client.submitJob(job)
submitted_pings.append(job)
self._wait(submitted_pings)
for ping in submitted_pings:
if ping.state == JOB_UNKNOWN:
if ping.disconnect:
LOG.error(
"Gearman Job server failed during OFFLINE check of {0}".
format(ping.job.task)
)
elif ping.timed_out:
failed_list.append(ping.job.task)
elif ping.result['network'] == 'FAIL':
failed_list.append(ping.job.task)
elif not ping.complete:
failed_list.append(ping.name)
elif ping.msg['network'] == 'FAIL':
failed_list.append(ping.name)
else:
gearman_count = 0
gearman_fail = 0
for gearman_test in ping.result['gearman']:
for gearman_test in ping.msg['gearman']:
gearman_count += 1
if gearman_test['status'] == 'FAIL':
gearman_fail += 1
# Need 2/3rds gearman up
max_fail_count = gearman_count / 3
if gearman_fail > max_fail_count:
failed_list.append(ping.job.task)
failed_list.append(ping.name)
return failed_list
def get_discover(self, name):
# Used in the v2 devices controller
job_data = {"hpcs_action": "DISCOVER"}
job = self.gm_client.submit_job(
str(name), job_data, background=False, wait_until_complete=True,
poll_timeout=10
)
if job.state == JOB_UNKNOWN:
# Gearman server failed
job = GearJobs.DisconnectJob(str(name), job_data)
self.gm_client.submitJob(job, gear.PRECEDENCE_HIGH)
poll_count = 0
while not job.complete and not job.disconnect \
and poll_count < self.poll_retry:
# wait for jobs TODO make sure right unit/value
time.sleep(self.poll_timeout)
poll_count += 1
if not job.complete:
return None
elif job.timed_out:
# Time out is a fail
return None
elif job.result['hpcs_response'] == 'FAIL':
if job.result['hpcs_response'] == 'FAIL':
# Fail response is a fail
return None
return job.result
def get_stats(self, node_list):
# TODO: lots of duplicated code that needs cleanup
list_of_jobs = []
failed_list = []
retry_list = []
submitted_stats = []
results = {}
job_data = {"hpcs_action": "METRICS"}
for node in node_list:
list_of_jobs.append(dict(task=str(node), data=job_data))
submitted_stats = self.gm_client.submit_multiple_jobs(
list_of_jobs, background=False, wait_until_complete=True,
poll_timeout=self.poll_timeout
)
job = GearJobs.DisconnectJob(str(node), job_data)
self.gm_client.submitJob(job)
submitted_stats.append(job)
self._wait(submitted_stats)
for stats in submitted_stats:
if stats.state == JOB_UNKNOWN:
if stats.disconnect:
# TODO: Gearman server failed, ignoring for now
retry_list.append(stats.job.task)
elif stats.timed_out:
retry_list.append(stats)
elif not stats.complete:
# Timeout
retry_list.append(stats.job.task)
elif stats.result['hpcs_response'] == 'FAIL':
retry_list.append(stats)
elif stats.msg['hpcs_response'] == 'FAIL':
# Error returned by Gearman
failed_list.append(stats.job.task)
failed_list.append(stats.name)
else:
# Success
results[stats.job.task] = stats.result
results[stats.name] = stats.msg
list_of_jobs = []
submitted_stats = []
if len(retry_list) > 0:
LOG.info(
"{0} Statistics gathering timed out, retrying".
format(len(retry_list))
)
for node in retry_list:
list_of_jobs.append(dict(task=str(node), data=job_data))
submitted_stats = self.gm_client.submit_multiple_jobs(
list_of_jobs, background=False, wait_until_complete=True,
poll_timeout=self.poll_retry
)
job = GearJobs.DisconnectJob(node.name, node.msg)
self.gm_client.submitJob(job)
submitted_stats.append(job)
self._wait(submitted_stats)
for stats in submitted_stats:
if stats.state == JOB_UNKNOWN:
if stats.disconnect:
# TODO: Gearman server failed, ignoring for now
LOG.error(
"Gearman Job server failed gathering statistics "
"on {0}".format(stats.job.task)
)
failed_list.append(stats.job.task)
elif stats.timed_out:
failed_list.append(stats.name)
elif not stats.complete:
# Timeout
failed_list.append(stats.job.task)
elif stats.result['hpcs_response'] == 'FAIL':
failed_list.append(stats.name)
elif stats.msg['hpcs_response'] == 'FAIL':
# Error returned by Gearman
failed_list.append(stats.job.task)
failed_list.append(stats.name)
else:
# Success
results[stats.job.task] = stats.result
results[stats.name] = stats.msg
return failed_list, results

View File

@ -37,6 +37,7 @@ class UsageStats(object):
self.server_id = cfg.CONF['admin_api']['server_id']
self.number_of_servers = cfg.CONF['admin_api']['number_of_servers']
self.stats_freq = cfg.CONF['admin_api'].stats_freq
self.gearman = GearJobs()
self.start_stats_sched()
@ -101,8 +102,7 @@ class UsageStats(object):
for device in devices:
node_list.append(device.name)
gearman = GearJobs()
failed_list, results = gearman.get_stats(node_list)
failed_list, results = self.gearman.get_stats(node_list)
failed = len(failed_list)
if failed > 0:

View File

@ -12,8 +12,9 @@
# License for the specific language governing permissions and limitations
# under the License.
import json
from gearman import GearmanClient, GearmanWorker, DataEncoder
import json
import gear
class JSONDataEncoder(DataEncoder):
@ -38,3 +39,14 @@ class JSONGearmanWorker(GearmanWorker):
class JSONGearmanClient(GearmanClient):
""" Overload the Gearman client class so we can set the data encoder. """
data_encoder = JSONDataEncoder
# Here is the good stuff
class JsonJob(gear.Job):
def __init__(self, name, msg, unique=None):
super(JsonJob, self).__init__(name, json.dumps(msg), unique)
@property
def msg(self):
if self.data:
return json.loads(self.data[0])

2918
libra/gear/__init__.py Normal file

File diff suppressed because it is too large Load Diff

289
libra/gear/acl.py Normal file
View File

@ -0,0 +1,289 @@
# Copyright 2014 OpenStack Foundation
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import re
class ACLError(Exception):
pass
class ACLEntry(object):
"""An access control list entry.
:arg str subject: The SSL certificate Subject Common Name to which
the entry applies.
:arg str register: A regular expression that matches the jobs that
connections with this certificate are permitted to register.
:arg str invoke: A regular expression that matches the jobs that
connections with this certificate are permitted to invoke.
Also implies the permission to cancel the same set of jobs in
the queue.
:arg boolean grant: A flag indicating whether connections with
this certificate are permitted to grant access to other
connections. Also implies the permission to revoke access
from other connections. The ability to self-revoke access is
always implied.
"""
def __init__(self, subject, register=None, invoke=None, grant=False):
self.subject = subject
self.setRegister(register)
self.setInvoke(invoke)
self.setGrant(grant)
def __repr__(self):
return ('<ACLEntry for %s register=%s invoke=%s grant=%s>' %
(self.subject, self.register, self.invoke, self.grant))
def isEmpty(self):
"""Checks whether this entry grants any permissions at all.
:returns: False if any permission is granted, otherwise True.
"""
if (self.register is None and
self.invoke is None and
self.grant is False):
return True
return False
def canRegister(self, name):
"""Check whether this subject is permitted to register a function.
:arg str name: The function name to check.
:returns: A boolean indicating whether the action should be permitted.
"""
if self.register is None:
return False
if not self._register.match(name):
return False
return True
def canInvoke(self, name):
"""Check whether this subject is permitted to register a function.
:arg str name: The function name to check.
:returns: A boolean indicating whether the action should be permitted.
"""
if self.invoke is None:
return False
if not self._invoke.match(name):
return False
return True
def setRegister(self, register):
"""Sets the functions that this subject can register.
:arg str register: A regular expression that matches the jobs that
connections with this certificate are permitted to register.
"""
self.register = register
if register:
try:
self._register = re.compile(register)
except re.error, e:
raise ACLError('Regular expression error: %s' % (e.message,))
else:
self._register = None
def setInvoke(self, invoke):
"""Sets the functions that this subject can invoke.
:arg str invoke: A regular expression that matches the jobs that
connections with this certificate are permitted to invoke.
"""
self.invoke = invoke
if invoke:
try:
self._invoke = re.compile(invoke)
except re.error, e:
raise ACLError('Regular expression error: %s' % (e.message,))
else:
self._invoke = None
def setGrant(self, grant):
"""Sets whether this subject can grant ACLs to others.
:arg boolean grant: A flag indicating whether connections with
this certificate are permitted to grant access to other
connections. Also implies the permission to revoke access
from other connections. The ability to self-revoke access is
always implied.
"""
self.grant = grant
class ACL(object):
"""An access control list.
ACLs are deny-by-default. The checked actions are only allowed if
there is an explicit rule in the ACL granting permission for a
given client (identified by SSL certificate Common Name Subject)
to perform that action.
"""
def __init__(self):
self.subjects = {}
def add(self, entry):
"""Add an ACL entry.
:arg Entry entry: The :py:class:`ACLEntry` to add.
:raises ACLError: If there is already an entry for the subject.
"""
if entry.subject in self.subjects:
raise ACLError("An ACL entry for %s already exists" %
(entry.subject,))
self.subjects[entry.subject] = entry
def remove(self, subject):
"""Remove an ACL entry.
:arg str subject: The SSL certificate Subject Common Name to
remove from the ACL.
:raises ACLError: If there is no entry for the subject.
"""
if subject not in self.subjects:
raise ACLError("There is no ACL entry for %s" % (subject,))
del self.subjects[subject]
def getEntries(self):
"""Return a list of current ACL entries.
:returns: A list of :py:class:`ACLEntry` objects.
"""
items = self.subjects.items()
items.sort(lambda a, b: cmp(a[0], b[0]))
return [x[1] for x in items]
def canRegister(self, subject, name):
"""Check whether a subject is permitted to register a function.
:arg str subject: The SSL certificate Subject Common Name to
check against.
:arg str name: The function name to check.
:returns: A boolean indicating whether the action should be permitted.
"""
entry = self.subjects.get(subject)
if entry is None:
return False
return entry.canRegister(name)
def canInvoke(self, subject, name):
"""Check whether a subject is permitted to invoke a function.
:arg str subject: The SSL certificate Subject Common Name to
check against.
:arg str name: The function name to check.
:returns: A boolean indicating whether the action should be permitted.
"""
entry = self.subjects.get(subject)
if entry is None:
return False
return entry.canInvoke(name)
def canGrant(self, subject):
"""Check whether a subject is permitted to grant access to others.
:arg str subject: The SSL certificate Subject Common Name to
check against.
:returns: A boolean indicating whether the action should be permitted.
"""
entry = self.subjects.get(subject)
if entry is None:
return False
if not entry.grant:
return False
return True
def grantInvoke(self, subject, invoke):
"""Grant permission to invoke certain functions.
:arg str subject: The SSL certificate Subject Common Name to which
the entry applies.
:arg str invoke: A regular expression that matches the jobs
that connections with this certificate are permitted to
invoke. Also implies the permission to cancel the same
set of jobs in the queue.
"""
e = self.subjects.get(subject)
if not e:
e = ACLEntry(subject)
self.add(e)
e.setInvoke(invoke)
def grantRegister(self, subject, register):
"""Grant permission to register certain functions.
:arg str subject: The SSL certificate Subject Common Name to which
the entry applies.
:arg str register: A regular expression that matches the jobs that
connections with this certificate are permitted to register.
"""
e = self.subjects.get(subject)
if not e:
e = ACLEntry(subject)
self.add(e)
e.setRegister(register)
def grantGrant(self, subject):
"""Grant permission to grant permissions to other connections.
:arg str subject: The SSL certificate Subject Common Name to which
the entry applies.
"""
e = self.subjects.get(subject)
if not e:
e = ACLEntry(subject)
self.add(e)
e.setGrant(True)
def revokeInvoke(self, subject):
"""Revoke permission to invoke all functions.
:arg str subject: The SSL certificate Subject Common Name to which
the entry applies.
"""
e = self.subjects.get(subject)
if e:
e.setInvoke(None)
if e.isEmpty():
self.remove(subject)
def revokeRegister(self, subject):
"""Revoke permission to register all functions.
:arg str subject: The SSL certificate Subject Common Name to which
the entry applies.
"""
e = self.subjects.get(subject)
if e:
e.setRegister(None)
if e.isEmpty():
self.remove(subject)
def revokeGrant(self, subject):
"""Revoke permission to grant permissions to other connections.
:arg str subject: The SSL certificate Subject Common Name to which
the entry applies.
"""
e = self.subjects.get(subject)
if e:
e.setGrant(False)
if e.isEmpty():
self.remove(subject)

83
libra/gear/constants.py Normal file
View File

@ -0,0 +1,83 @@
# Copyright 2013 OpenStack Foundation
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
Protocol Constants
==================
These are not necessary for normal API usage. See the `Gearman
protocol reference <http://gearman.org/protocol>`_ for an explanation
of each of these.
Magic Codes
-----------
.. py:data:: REQ
The Gearman magic code for a request.
.. py:data:: RES
The Gearman magic code for a response.
Packet Types
------------
"""
types = {
1: 'CAN_DO',
2: 'CANT_DO',
3: 'RESET_ABILITIES',
4: 'PRE_SLEEP',
# unused
6: 'NOOP',
7: 'SUBMIT_JOB',
8: 'JOB_CREATED',
9: 'GRAB_JOB',
10: 'NO_JOB',
11: 'JOB_ASSIGN',
12: 'WORK_STATUS',
13: 'WORK_COMPLETE',
14: 'WORK_FAIL',
15: 'GET_STATUS',
16: 'ECHO_REQ',
17: 'ECHO_RES',
18: 'SUBMIT_JOB_BG',
19: 'ERROR',
20: 'STATUS_RES',
21: 'SUBMIT_JOB_HIGH',
22: 'SET_CLIENT_ID',
23: 'CAN_DO_TIMEOUT',
24: 'ALL_YOURS',
25: 'WORK_EXCEPTION',
26: 'OPTION_REQ',
27: 'OPTION_RES',
28: 'WORK_DATA',
29: 'WORK_WARNING',
30: 'GRAB_JOB_UNIQ',
31: 'JOB_ASSIGN_UNIQ',
32: 'SUBMIT_JOB_HIGH_BG',
33: 'SUBMIT_JOB_LOW',
34: 'SUBMIT_JOB_LOW_BG',
35: 'SUBMIT_JOB_SCHED',
36: 'SUBMIT_JOB_EPOCH',
}
for i, name in types.items():
globals()[name] = i
__doc__ += '\n.. py:data:: %s\n' % name
REQ = b'\x00REQ'
RES = b'\x00RES'

View File

@ -2,7 +2,8 @@ pbr>=0.5.21,<1.0
Babel>=0.9.6
eventlet
gear
# put back once it's patched
# gear
gearman>=2.0.2
oslo.config>=1.2.0
python-daemon>=1.6