Add support for ceph-osd broker requests

This is to support ceph-osd requesting movement
of OSD devices into various buckets

Change-Id: Ief548201e43860c88591c2ac814984a421c023e9
This commit is contained in:
Chris MacNaughton 2016-12-07 07:48:11 -05:00
parent 5d8541b573
commit c046817a52
5 changed files with 115 additions and 7 deletions

View File

@ -362,6 +362,7 @@ def upgrade_keys():
@hooks.hook('osd-relation-joined')
@hooks.hook('osd-relation-changed')
def osd_relation(relid=None):
if ceph.is_quorum():
log('mon cluster in quorum - providing fsid & keys')
@ -374,6 +375,19 @@ def osd_relation(relid=None):
'osd_upgrade_key': ceph.get_named_key('osd-upgrade',
caps=ceph.osd_upgrade_caps),
}
unit = remote_unit()
settings = relation_get(rid=relid, unit=unit)
"""Process broker request(s)."""
if 'broker_req' in settings:
if ceph.is_leader():
rsp = process_requests(settings['broker_req'])
unit_id = unit.replace('/', '-')
unit_response_key = 'broker-rsp-' + unit_id
data[unit_response_key] = rsp
else:
log("Not leader - ignoring broker request", level=DEBUG)
relation_set(relation_id=relid,
relation_settings=data)
# NOTE: radosgw key provision is gated on presence of OSD

1
hooks/osd-relation-changed Symbolic link
View File

@ -0,0 +1 @@
ceph_hooks.py

View File

@ -455,6 +455,33 @@ class CrushLocation(object):
return self.name < other.name
def get_osd_weight(osd_id):
"""
Returns the weight of the specified OSD
:return: Float :raise: ValueError if the monmap fails to parse.
Also raises CalledProcessError if our ceph command fails
"""
try:
tree = subprocess.check_output(
['ceph', 'osd', 'tree', '--format=json'])
try:
json_tree = json.loads(tree)
# Make sure children are present in the json
if not json_tree['nodes']:
return None
for device in json_tree['nodes']:
if device['type'] == 'osd' and device['name'] == osd_id:
return device['crush_weight']
except ValueError as v:
log("Unable to parse ceph tree json: {}. Error: {}".format(
tree, v.message))
raise
except subprocess.CalledProcessError as e:
log("ceph osd tree command failed with message: {}".format(
e.message))
raise
def get_osd_tree(service):
"""
Returns the current osd map in JSON.
@ -1216,12 +1243,12 @@ def osdize_dev(dev, osd_format, osd_journal, reformat_osd=False,
try:
log("osdize cmd: {}".format(cmd))
subprocess.check_call(cmd)
except subprocess.CalledProcessError as e:
except subprocess.CalledProcessError:
if ignore_errors:
log('Unable to initialize device: {}'.format(dev), WARNING)
else:
log('Unable to initialize device: {}'.format(dev), ERROR)
raise e
raise
def osdize_dir(path, encrypt=False):

View File

@ -24,7 +24,11 @@ from charmhelpers.core.hookenv import (
INFO,
ERROR,
)
from ceph import get_cephfs
from ceph import (
get_cephfs,
get_osd_weight
)
from ceph_helpers import Crushmap
from charmhelpers.contrib.storage.linux.ceph import (
create_erasure_profile,
delete_pool,
@ -360,6 +364,36 @@ def handle_rgw_zone_set(request, service):
os.unlink(infile.name)
def handle_put_osd_in_bucket(request, service):
osd_id = request.get('osd')
target_bucket = request.get('bucket')
if not osd_id or not target_bucket:
msg = "Missing OSD ID or Bucket"
log(msg, level=ERROR)
return {'exit-code': 1, 'stderr': msg}
crushmap = Crushmap()
try:
crushmap.ensure_bucket_is_present(target_bucket)
check_output(
[
'ceph',
'--id', service,
'osd',
'crush',
'set',
str(osd_id),
str(get_osd_weight(osd_id)),
"root={}".format(target_bucket)
]
)
except Exception as exc:
msg = "Failed to move OSD " \
"{} into Bucket {} :: {}".format(osd_id, target_bucket, exc)
log(msg, level=ERROR)
return {'exit-code': 1, 'stderr': msg}
def handle_rgw_create_user(request, service):
user_id = request.get('rgw-uid')
display_name = request.get('display-name')
@ -534,6 +568,8 @@ def process_requests_v1(reqs):
ret = handle_rgw_regionmap_default(request=req, service=svc)
elif op == "rgw-create-user":
ret = handle_rgw_create_user(request=req, service=svc)
elif op == "move-osd-to-bucket":
ret = handle_put_osd_in_bucket(request=req, service=svc)
else:
msg = "Unknown operation '%s'" % op
log(msg, level=ERROR)

View File

@ -25,6 +25,7 @@
import errno
import hashlib
import math
from charmhelpers.contrib.network.ip import format_ipv6_addr
import six
import os
@ -56,7 +57,8 @@ from charmhelpers.core.host import (mount,
from charmhelpers.fetch import (apt_install, )
from charmhelpers.core.kernel import modprobe
from charmhelpers.contrib.openstack.utils import config_flags_parser
from charmhelpers.contrib.openstack.utils import config_flags_parser, \
get_host_ip
KEYRING = '/etc/ceph/ceph.client.{}.keyring'
KEYFILE = '/etc/ceph/ceph.client.{}.key'
@ -191,6 +193,11 @@ class Crushmap(object):
log("load_crushmap error: {}".format(e))
raise "Failed to read Crushmap"
def ensure_bucket_is_present(self, bucket_name):
if bucket_name not in [bucket.name for bucket in self.buckets()]:
self.add_bucket(bucket_name)
self.save()
def buckets(self):
"""Return a list of buckets that are in the Crushmap."""
return self._buckets
@ -411,7 +418,7 @@ class Pool(object):
# highest value is used. To do this, find the nearest power of 2 such
# that 2^n <= num_pg, check to see if its within the 25% tolerance.
exponent = math.floor(math.log(num_pg, 2))
nearest = 2**exponent
nearest = 2 ** exponent
if (num_pg - nearest) > (num_pg * 0.25):
# Choose the next highest power of 2 since the nearest is more
# than 25% below the original value.
@ -421,7 +428,6 @@ class Pool(object):
class ReplicatedPool(Pool):
def __init__(self,
service,
name,
@ -455,7 +461,6 @@ class ReplicatedPool(Pool):
# Default jerasure erasure coded pool
class ErasurePool(Pool):
def __init__(self,
service,
name,
@ -1179,6 +1184,30 @@ def ensure_ceph_keyring(service, user=None, group=None, relation='ceph'):
return True
def get_mon_hosts():
"""
Helper function to gather up the ceph monitor host public addresses
:return: list. Returns a list of ip_address:port
"""
hosts = []
for relid in relation_ids('mon'):
for unit in related_units(relid):
addr = \
relation_get('ceph-public-address',
unit,
relid) or get_host_ip(
relation_get(
'private-address',
unit,
relid))
if addr:
hosts.append('{}:6789'.format(format_ipv6_addr(addr) or addr))
hosts.sort()
return hosts
def ceph_version():
"""Retrieve the local version of ceph."""
if os.path.exists('/usr/bin/ceph'):
@ -1293,6 +1322,7 @@ class CephBrokerRsp(object):
def exit_msg(self):
return self.rsp.get('stderr')
# Ceph Broker Conversation:
# If a charm needs an action to be taken by ceph it can create a CephBrokerRq
# and send that request to ceph via the ceph relation. The CephBrokerRq has a