Migrate to shared lib

This change moves our ceph.py into
a seperate repository that we can share between various
ceph related Juju projects, along with a Makefile
change to use a new git_sync file to partially sync
a git repository into a specified path

Change-Id: Iaf3ea38b6e5268c517d53b36105b70f23de891bb
This commit is contained in:
Chris MacNaughton 2016-08-01 16:12:00 -04:00
parent 7becad3865
commit f9993191ab
12 changed files with 488 additions and 47 deletions

View File

@ -15,12 +15,21 @@ functional_test:
bin/charm_helpers_sync.py:
@mkdir -p bin
@bzr cat lp:charm-helpers/tools/charm_helpers_sync/charm_helpers_sync.py \
> bin/charm_helpers_sync.py
> bin/charm_helpers_sync.py
sync: bin/charm_helpers_sync.py
bin/git_sync.py:
@mkdir -p bin
@wget -O bin/git_sync.py https://raw.githubusercontent.com/ChrisMacNaughton/git-sync/master/git_sync.py
ch-sync: bin/charm_helpers_sync.py
$(PYTHON) bin/charm_helpers_sync.py -c charm-helpers-hooks.yaml
$(PYTHON) bin/charm_helpers_sync.py -c charm-helpers-tests.yaml
git-sync: bin/git_sync.py
$(PYTHON) bin/git_sync.py -d lib/ceph -s https://github.com/CanonicalLtd/charms_ceph.git
sync: git-sync ch-sync
publish: lint
bzr push lp:charms/ceph-osd
bzr push lp:charms/trusty/ceph-osd

View File

@ -14,3 +14,4 @@
import sys
sys.path.append('hooks')
sys.path.append('lib')

View File

@ -20,13 +20,14 @@ import os
import sys
from subprocess import check_call
sys.path.append('lib')
sys.path.append('hooks')
from charmhelpers.core.hookenv import (
action_fail,
)
from ceph import get_local_osd_ids
from ceph.ceph.ceph import get_local_osd_ids
from ceph_hooks import assess_status
from utils import (

View File

@ -22,7 +22,8 @@ import socket
import time
import netifaces
import ceph
sys.path.append('lib')
from ceph.ceph import ceph
from charmhelpers.core import hookenv
from charmhelpers.core.hookenv import (
log,

0
lib/ceph/__init__.py Normal file
View File

View File

@ -0,0 +1 @@
__author__ = 'chris'

View File

@ -4,52 +4,50 @@
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import ctypes
import ctypes.util
import errno
import json
import subprocess
import time
import os
import re
import sys
import errno
import shutil
from charmhelpers.cli.host import mounts
from charmhelpers.core import hookenv
from charmhelpers.core.host import (
mkdir,
chownr,
cmp_pkgrevno,
service_restart,
lsb_release,
service_stop,
service_restart)
cmp_pkgrevno, service_stop, mounts)
from charmhelpers.core.hookenv import (
log,
ERROR,
WARNING,
DEBUG,
cached,
status_set,
)
WARNING, DEBUG)
from charmhelpers.core.services import render_template
from charmhelpers.fetch import (
apt_cache
)
from charmhelpers.contrib.storage.linux.utils import (
zap_disk,
is_block_device,
is_device_mounted,
)
zap_disk,
is_device_mounted)
from utils import (
get_unit_hostname,
render_template)
)
LEADER = 'leader'
PEON = 'peon'
@ -499,6 +497,30 @@ def get_local_osd_ids():
return osd_ids
def get_local_mon_ids():
"""
This will list the /var/lib/ceph/mon/* directories and try
to split the ID off of the directory name and return it in
a list
:return: list. A list of monitor identifiers :raise: OSError if
something goes wrong with listing the directory.
"""
mon_ids = []
mon_path = os.path.join(os.sep, 'var', 'lib', 'ceph', 'mon')
if os.path.exists(mon_path):
try:
dirs = os.listdir(mon_path)
for mon_dir in dirs:
# Basically this takes everything after ceph- as the monitor ID
match = re.search('ceph-(?P<mon_id>.*)', mon_dir)
if match:
mon_ids.append(match.group('mon_id'))
except OSError:
raise
return mon_ids
def _is_int(v):
"""Return True if the object v can be turned into an integer."""
try:
@ -509,7 +531,7 @@ def _is_int(v):
def get_version():
'''Derive Ceph release from an installed package.'''
"""Derive Ceph release from an installed package."""
import apt_pkg as apt
cache = apt_cache()
@ -600,6 +622,7 @@ def is_leader():
def wait_for_quorum():
while not is_quorum():
log("Waiting for quorum to be reached")
time.sleep(3)
@ -782,7 +805,7 @@ def is_bootstrapped():
def wait_for_bootstrap():
while (not is_bootstrapped()):
while not is_bootstrapped():
time.sleep(3)
@ -815,6 +838,18 @@ def import_osd_upgrade_key(key):
]
subprocess.check_call(cmd)
def generate_monitor_secret():
cmd = [
'ceph-authtool',
'/dev/stdout',
'--name=mon.',
'--gen-key'
]
res = subprocess.check_output(cmd)
return "{}==".format(res.split('=')[1].strip())
# OSD caps taken from ceph-create-keys
_osd_bootstrap_caps = {
'mon': [
@ -878,9 +913,12 @@ def import_radosgw_key(key):
# OSD caps taken from ceph-create-keys
_radosgw_caps = {
'mon': ['allow r'],
'mon': ['allow rw'],
'osd': ['allow rwx']
}
_upgrade_caps = {
'mon': ['allow rwx']
}
def get_radosgw_key():
@ -888,10 +926,34 @@ def get_radosgw_key():
_default_caps = {
'mon': ['allow r'],
'mon': ['allow rw'],
'osd': ['allow rwx']
}
admin_caps = {
'mds': ['allow'],
'mon': ['allow *'],
'osd': ['allow *']
}
osd_upgrade_caps = {
'mon': ['allow command "config-key"',
'allow command "osd tree"',
'allow command "config-key list"',
'allow command "config-key put"',
'allow command "config-key get"',
'allow command "config-key exists"',
'allow command "osd out"',
'allow command "osd in"',
'allow command "osd rm"',
'allow command "auth del"',
]
}
def get_upgrade_key():
return get_named_key('upgrade-osd', _upgrade_caps)
def get_named_key(name, caps=None):
caps = caps or _default_caps
@ -916,6 +978,19 @@ def get_named_key(name, caps=None):
return parse_key(subprocess.check_output(cmd).strip()) # IGNORE:E1103
def upgrade_key_caps(key, caps):
""" Upgrade key to have capabilities caps """
if not is_leader():
# Not the MON leader OR not clustered
return
cmd = [
"sudo", "-u", ceph_user(), 'ceph', 'auth', 'caps', key
]
for subsystem, subcaps in caps.iteritems():
cmd.extend([subsystem, '; '.join(subcaps)])
subprocess.check_call(cmd)
@cached
def systemd():
return (lsb_release()['DISTRIB_CODENAME'] >= 'vivid')

View File

@ -0,0 +1,352 @@
#!/usr/bin/python
#
# Copyright 2016 Canonical Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import json
from charmhelpers.core.hookenv import (
log,
DEBUG,
INFO,
ERROR,
)
from charmhelpers.contrib.storage.linux.ceph import (
create_erasure_profile,
delete_pool,
erasure_profile_exists,
get_osds,
pool_exists,
pool_set,
remove_pool_snapshot,
rename_pool,
set_pool_quota,
snapshot_pool,
validator,
ErasurePool,
Pool,
ReplicatedPool,
)
# This comes from http://docs.ceph.com/docs/master/rados/operations/pools/
# This should do a decent job of preventing people from passing in bad values.
# It will give a useful error message
POOL_KEYS = {
# "Ceph Key Name": [Python type, [Valid Range]]
"size": [int],
"min_size": [int],
"crash_replay_interval": [int],
"pgp_num": [int], # = or < pg_num
"crush_ruleset": [int],
"hashpspool": [bool],
"nodelete": [bool],
"nopgchange": [bool],
"nosizechange": [bool],
"write_fadvise_dontneed": [bool],
"noscrub": [bool],
"nodeep-scrub": [bool],
"hit_set_type": [basestring, ["bloom", "explicit_hash",
"explicit_object"]],
"hit_set_count": [int, [1, 1]],
"hit_set_period": [int],
"hit_set_fpp": [float, [0.0, 1.0]],
"cache_target_dirty_ratio": [float],
"cache_target_dirty_high_ratio": [float],
"cache_target_full_ratio": [float],
"target_max_bytes": [int],
"target_max_objects": [int],
"cache_min_flush_age": [int],
"cache_min_evict_age": [int],
"fast_read": [bool],
}
CEPH_BUCKET_TYPES = [
'osd',
'host',
'chassis',
'rack',
'row',
'pdu',
'pod',
'room',
'datacenter',
'region',
'root'
]
def decode_req_encode_rsp(f):
"""Decorator to decode incoming requests and encode responses."""
def decode_inner(req):
return json.dumps(f(json.loads(req)))
return decode_inner
@decode_req_encode_rsp
def process_requests(reqs):
"""Process Ceph broker request(s).
This is a versioned api. API version must be supplied by the client making
the request.
"""
request_id = reqs.get('request-id')
try:
version = reqs.get('api-version')
if version == 1:
log('Processing request {}'.format(request_id), level=DEBUG)
resp = process_requests_v1(reqs['ops'])
if request_id:
resp['request-id'] = request_id
return resp
except Exception as exc:
log(str(exc), level=ERROR)
msg = ("Unexpected error occurred while processing requests: %s" %
reqs)
log(msg, level=ERROR)
return {'exit-code': 1, 'stderr': msg}
msg = ("Missing or invalid api version (%s)" % version)
resp = {'exit-code': 1, 'stderr': msg}
if request_id:
resp['request-id'] = request_id
return resp
def handle_create_erasure_profile(request, service):
# "local" | "shec" or it defaults to "jerasure"
erasure_type = request.get('erasure-type')
# "host" | "rack" or it defaults to "host" # Any valid Ceph bucket
failure_domain = request.get('failure-domain')
name = request.get('name')
k = request.get('k')
m = request.get('m')
l = request.get('l')
if failure_domain not in CEPH_BUCKET_TYPES:
msg = "failure-domain must be one of {}".format(CEPH_BUCKET_TYPES)
log(msg, level=ERROR)
return {'exit-code': 1, 'stderr': msg}
create_erasure_profile(service=service, erasure_plugin_name=erasure_type,
profile_name=name, failure_domain=failure_domain,
data_chunks=k, coding_chunks=m, locality=l)
def handle_erasure_pool(request, service):
pool_name = request.get('name')
erasure_profile = request.get('erasure-profile')
quota = request.get('max-bytes')
weight = request.get('weight')
if erasure_profile is None:
erasure_profile = "default-canonical"
# Check for missing params
if pool_name is None:
msg = "Missing parameter. name is required for the pool"
log(msg, level=ERROR)
return {'exit-code': 1, 'stderr': msg}
# TODO: Default to 3/2 erasure coding. I believe this requires min 5 osds
if not erasure_profile_exists(service=service, name=erasure_profile):
# TODO: Fail and tell them to create the profile or default
msg = ("erasure-profile {} does not exist. Please create it with: "
"create-erasure-profile".format(erasure_profile))
log(msg, level=ERROR)
return {'exit-code': 1, 'stderr': msg}
pool = ErasurePool(service=service, name=pool_name,
erasure_code_profile=erasure_profile,
percent_data=weight)
# Ok make the erasure pool
if not pool_exists(service=service, name=pool_name):
log("Creating pool '%s' (erasure_profile=%s)" % (pool.name,
erasure_profile),
level=INFO)
pool.create()
# Set a quota if requested
if quota is not None:
set_pool_quota(service=service, pool_name=pool_name, max_bytes=quota)
def handle_replicated_pool(request, service):
pool_name = request.get('name')
replicas = request.get('replicas')
quota = request.get('max-bytes')
weight = request.get('weight')
# Optional params
pg_num = request.get('pg_num')
if pg_num:
# Cap pg_num to max allowed just in case.
osds = get_osds(service)
if osds:
pg_num = min(pg_num, (len(osds) * 100 // replicas))
# Check for missing params
if pool_name is None or replicas is None:
msg = "Missing parameter. name and replicas are required"
log(msg, level=ERROR)
return {'exit-code': 1, 'stderr': msg}
kwargs = {}
if pg_num:
kwargs['pg_num'] = pg_num
if weight:
kwargs['percent_data'] = weight
if replicas:
kwargs['replicas'] = replicas
pool = ReplicatedPool(service=service,
name=pool_name, **kwargs)
if not pool_exists(service=service, name=pool_name):
log("Creating pool '%s' (replicas=%s)" % (pool.name, replicas),
level=INFO)
pool.create()
else:
log("Pool '%s' already exists - skipping create" % pool.name,
level=DEBUG)
# Set a quota if requested
if quota is not None:
set_pool_quota(service=service, pool_name=pool_name, max_bytes=quota)
def handle_create_cache_tier(request, service):
# mode = "writeback" | "readonly"
storage_pool = request.get('cold-pool')
cache_pool = request.get('hot-pool')
cache_mode = request.get('mode')
if cache_mode is None:
cache_mode = "writeback"
# cache and storage pool must exist first
if not pool_exists(service=service, name=storage_pool) or not pool_exists(
service=service, name=cache_pool):
msg = ("cold-pool: {} and hot-pool: {} must exist. Please create "
"them first".format(storage_pool, cache_pool))
log(msg, level=ERROR)
return {'exit-code': 1, 'stderr': msg}
p = Pool(service=service, name=storage_pool)
p.add_cache_tier(cache_pool=cache_pool, mode=cache_mode)
def handle_remove_cache_tier(request, service):
storage_pool = request.get('cold-pool')
cache_pool = request.get('hot-pool')
# cache and storage pool must exist first
if not pool_exists(service=service, name=storage_pool) or not pool_exists(
service=service, name=cache_pool):
msg = ("cold-pool: {} or hot-pool: {} doesn't exist. Not "
"deleting cache tier".format(storage_pool, cache_pool))
log(msg, level=ERROR)
return {'exit-code': 1, 'stderr': msg}
pool = Pool(name=storage_pool, service=service)
pool.remove_cache_tier(cache_pool=cache_pool)
def handle_set_pool_value(request, service):
# Set arbitrary pool values
params = {'pool': request.get('name'),
'key': request.get('key'),
'value': request.get('value')}
if params['key'] not in POOL_KEYS:
msg = "Invalid key '%s'" % params['key']
log(msg, level=ERROR)
return {'exit-code': 1, 'stderr': msg}
# Get the validation method
validator_params = POOL_KEYS[params['key']]
if len(validator_params) is 1:
# Validate that what the user passed is actually legal per Ceph's rules
validator(params['value'], validator_params[0])
else:
# Validate that what the user passed is actually legal per Ceph's rules
validator(params['value'], validator_params[0], validator_params[1])
# Set the value
pool_set(service=service, pool_name=params['pool'], key=params['key'],
value=params['value'])
def process_requests_v1(reqs):
"""Process v1 requests.
Takes a list of requests (dicts) and processes each one. If an error is
found, processing stops and the client is notified in the response.
Returns a response dict containing the exit code (non-zero if any
operation failed along with an explanation).
"""
ret = None
log("Processing %s ceph broker requests" % (len(reqs)), level=INFO)
for req in reqs:
op = req.get('op')
log("Processing op='%s'" % op, level=DEBUG)
# Use admin client since we do not have other client key locations
# setup to use them for these operations.
svc = 'admin'
if op == "create-pool":
pool_type = req.get('pool-type') # "replicated" | "erasure"
# Default to replicated if pool_type isn't given
if pool_type == 'erasure':
ret = handle_erasure_pool(request=req, service=svc)
else:
ret = handle_replicated_pool(request=req, service=svc)
elif op == "create-cache-tier":
ret = handle_create_cache_tier(request=req, service=svc)
elif op == "remove-cache-tier":
ret = handle_remove_cache_tier(request=req, service=svc)
elif op == "create-erasure-profile":
ret = handle_create_erasure_profile(request=req, service=svc)
elif op == "delete-pool":
pool = req.get('name')
ret = delete_pool(service=svc, name=pool)
elif op == "rename-pool":
old_name = req.get('name')
new_name = req.get('new-name')
ret = rename_pool(service=svc, old_name=old_name,
new_name=new_name)
elif op == "snapshot-pool":
pool = req.get('name')
snapshot_name = req.get('snapshot-name')
ret = snapshot_pool(service=svc, pool_name=pool,
snapshot_name=snapshot_name)
elif op == "remove-pool-snapshot":
pool = req.get('name')
snapshot_name = req.get('snapshot-name')
ret = remove_pool_snapshot(service=svc, pool_name=pool,
snapshot_name=snapshot_name)
elif op == "set-pool-value":
ret = handle_set_pool_value(request=req, service=svc)
else:
msg = "Unknown operation '%s'" % op
log(msg, level=ERROR)
return {'exit-code': 1, 'stderr': msg}
if type(ret) == dict and 'exit-code' in ret:
return ret
return {'exit-code': 0}

View File

@ -14,4 +14,5 @@
import sys
sys.path.append('hooks')
sys.path.append('lib')
sys.path.append('actions')

View File

@ -18,7 +18,7 @@ import posix
from mock import call, Mock, patch
import test_utils
import ceph
from ceph.ceph import ceph
import replace_osd
TO_PATCH = [
@ -73,13 +73,13 @@ class ReplaceOsdTestCase(test_utils.CharmTestCase):
])
assert ret == 0
@patch('ceph.mounts')
@patch('ceph.subprocess')
@patch('ceph.umount')
@patch('ceph.osdize')
@patch('ceph.shutil')
@patch('ceph.systemd')
@patch('ceph.ceph_user')
@patch('ceph.ceph.ceph.mounts')
@patch('ceph.ceph.ceph.subprocess')
@patch('ceph.ceph.ceph.umount')
@patch('ceph.ceph.ceph.osdize')
@patch('ceph.ceph.ceph.shutil')
@patch('ceph.ceph.ceph.systemd')
@patch('ceph.ceph.ceph.ceph_user')
def test_replace_osd(self,
ceph_user,
systemd,

View File

@ -1,7 +1,7 @@
__author__ = 'Chris Holcombe <chris.holcombe@canonical.com>'
from mock import patch, call
import test_utils
import ceph
from ceph.ceph import ceph
TO_PATCH = [
'hookenv',
@ -16,8 +16,8 @@ class PerformanceTestCase(test_utils.CharmTestCase):
super(PerformanceTestCase, self).setUp(ceph, TO_PATCH)
def test_tune_nic(self):
with patch('ceph.get_link_speed', return_value=10000):
with patch('ceph.save_sysctls') as save_sysctls:
with patch('ceph.ceph.ceph.get_link_speed', return_value=10000):
with patch('ceph.ceph.ceph.save_sysctls') as save_sysctls:
ceph.tune_nic('eth0')
save_sysctls.assert_has_calls(
[
@ -49,12 +49,12 @@ class PerformanceTestCase(test_utils.CharmTestCase):
uuid = ceph.get_block_uuid('/dev/sda1')
self.assertEqual(uuid, '378f3c86-b21a-4172-832d-e2b3d4bc7511')
@patch('ceph.persist_settings')
@patch('ceph.set_hdd_read_ahead')
@patch('ceph.get_max_sectors_kb')
@patch('ceph.get_max_hw_sectors_kb')
@patch('ceph.set_max_sectors_kb')
@patch('ceph.get_block_uuid')
@patch('ceph.ceph.ceph.persist_settings')
@patch('ceph.ceph.ceph.set_hdd_read_ahead')
@patch('ceph.ceph.ceph.get_max_sectors_kb')
@patch('ceph.ceph.ceph.get_max_hw_sectors_kb')
@patch('ceph.ceph.ceph.set_max_sectors_kb')
@patch('ceph.ceph.ceph.get_block_uuid')
def test_tune_dev(self,
block_uuid,
set_max_sectors_kb,
@ -84,12 +84,12 @@ class PerformanceTestCase(test_utils.CharmTestCase):
call('maintenance', 'Finished tuning device /dev/sda')
])
@patch('ceph.persist_settings')
@patch('ceph.set_hdd_read_ahead')
@patch('ceph.get_max_sectors_kb')
@patch('ceph.get_max_hw_sectors_kb')
@patch('ceph.set_max_sectors_kb')
@patch('ceph.get_block_uuid')
@patch('ceph.ceph.ceph.persist_settings')
@patch('ceph.ceph.ceph.set_hdd_read_ahead')
@patch('ceph.ceph.ceph.get_max_sectors_kb')
@patch('ceph.ceph.ceph.get_max_hw_sectors_kb')
@patch('ceph.ceph.ceph.set_max_sectors_kb')
@patch('ceph.ceph.ceph.get_block_uuid')
def test_tune_dev_2(self,
block_uuid,
set_max_sectors_kb,

View File

@ -16,7 +16,7 @@ import time
from mock import patch, call, MagicMock
from ceph import CrushLocation
from ceph.ceph.ceph import CrushLocation
import test_utils
import ceph_hooks