recon middlewear for the object server and utils for cluster monitoring

This commit is contained in:
Florian Hines 2011-07-27 10:41:07 -05:00
parent b6a3767efb
commit aa622eb799
3 changed files with 572 additions and 0 deletions

312
bin/swift-recon Executable file
View File

@ -0,0 +1,312 @@
#! /usr/bin/env python
"""
cmdline utility to perform cluster reconnaissance
"""
from eventlet.green import urllib2
from swift.common.ring import Ring
import simplejson as json
from hashlib import md5
import datetime
import eventlet
import optparse
import os
VERBOSE = False
SUPPRESS_ERRORS = False
def getdevices():
#todo , fitler by zone[s]
ring_file = "/etc/swift/object.ring.gz"
ring_data = Ring(ring_file)
ips = set((n['ip'], n['port']) for n in ring_data.devs)
return ips
def scout(base_url, recon_type):
global VERBOSE, SUPPRESS_ERRORS
url = base_url + recon_type
try:
body = urllib2.urlopen(url).read()
content = json.loads(body)
if VERBOSE:
print "-> %s: %s" % (url, content)
status = 200
except urllib2.HTTPError as e:
if not SUPPRESS_ERRORS or VERBOSE:
print "-> %s: %s" % (url, e)
content = e
status = e.code
except urllib2.URLError as e:
if not SUPPRESS_ERRORS or VERBOSE:
print "-> %s: %s" % (url, e)
content = e
status = -1
return url, content, status
def scout_md5(host):
base_url = "http://%s:%s/recon/" % (host[0], host[1])
url, content, status = scout(base_url, "ringmd5")
return url, content, status
def scout_async(host):
base_url = "http://%s:%s/recon/" % (host[0], host[1])
url, content, status = scout(base_url, "async")
return url, content, status
def scout_replication(host):
base_url = "http://%s:%s/recon/" % (host[0], host[1])
url, content, status = scout(base_url, "replication")
return url, content, status
def scout_load(host):
base_url = "http://%s:%s/recon/" % (host[0], host[1])
url, content, status = scout(base_url, "load")
return url, content, status
def scout_du(host):
base_url = "http://%s:%s/recon/" % (host[0], host[1])
url, content, status = scout(base_url, "diskusage")
return url, content, status
def scout_umount(host):
base_url = "http://%s:%s/recon/" % (host[0], host[1])
url, content, status = scout(base_url, "unmounted")
return url, content, status
def get_ringmd5(ringfile):
stats = {}
matches = 0
errors = 0
hosts = getdevices()
md5sum = md5()
with open(ringfile, 'rb') as f:
block = f.read(4096)
while block:
md5sum.update(block)
block = f.read(4096)
ring_sum = md5sum.hexdigest()
pool = eventlet.GreenPool(20)
now = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
print "[%s] Checking ring md5sum's on %s hosts..." % (now, len(hosts))
if VERBOSE:
print "-> On disk md5sum: %s" % ring_sum
for url, response, status in pool.imap(scout_md5, hosts):
if status == 200:
#fixme - need to grab from config
stats[url] = response[ringfile]
if response[ringfile] != ring_sum:
ringsmatch = False
print "!! %s (%s) doesn't match on disk md5sum" % \
(url, response[ringfile])
else:
matches = matches + 1
if VERBOSE:
print "-> %s matches." % url
else:
errors = errors + 1
print "%s/%s hosts matched, %s error[s] while checking hosts." % \
(matches, len(hosts), errors)
print "=" * 79
def async_check():
ASYNC_COUNTER = 0
stats = {}
hosts = getdevices()
pool = eventlet.GreenPool(20)
now = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
print "[%s] Checking async pendings on %s hosts..." % (now, len(hosts))
for url, response, status in pool.imap(scout_async, hosts):
if status == 200:
stats[url] = response['async_pending']
if len(stats) > 0:
low = min(stats.values())
high = max(stats.values())
total = sum(stats.values())
average = total / len(stats)
print "Async stats: low: %d, high: %d, avg: %d, total: %d" % (low,
high, average, total)
else:
print "Error: No hosts where available or returned valid information."
print "=" * 79
def umount_check():
ASYNC_COUNTER = 0
stats = {}
hosts = getdevices()
pool = eventlet.GreenPool(20)
now = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
print "[%s] Getting unmounted drives from %s hosts..." % (now, len(hosts))
for url, response, status in pool.imap(scout_umount, hosts):
if status == 200:
for i in response:
stats[url] = i['device']
for host in stats:
print "Not mounted: %s on %s" % (stats[host], host)
print "=" * 79
def replication_check():
stats = {}
hosts = getdevices()
pool = eventlet.GreenPool(20)
now = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
print "[%s] Checking replication times on %s hosts..." % (now, len(hosts))
for url, response, status in pool.imap(scout_replication, hosts):
if status == 200:
stats[url] = response['object_replication_time']
if len(stats) > 0:
low = min(stats.values())
high = max(stats.values())
total = sum(stats.values())
average = total / len(stats)
print "[Replication Times] shortest: %s, longest: %s, avg: %s" % \
(low, high, average)
else:
print "Error: No hosts where available or returned valid information."
print "=" * 79
def load_check():
load1 = {}
load5 = {}
load15 = {}
hosts = getdevices()
pool = eventlet.GreenPool(20)
now = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
print "[%s] Checking load avg's on %s hosts..." % (now, len(hosts))
for url, response, status in pool.imap(scout_load, hosts):
if status == 200:
load1[url] = response['1m']
load5[url] = response['5m']
load15[url] = response['15m']
stats = {"1m": load1, "5m": load5, "15m": load15}
for item in stats:
if len(stats[item]) > 0:
low = min(stats[item].values())
high = max(stats[item].values())
total = sum(stats[item].values())
average = total / len(stats[item])
print "[%s load average] lowest: %s, highest: %s, avg: %s" % \
(item, low, high, average)
else:
print "Error: Hosts unavailable or returned valid information."
print "=" * 79
def disk_usage():
hosts = getdevices()
stats = {}
highs = []
lows = []
averages = []
percents = {}
pool = eventlet.GreenPool(20)
now = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
print "[%s] Checking disk usage on %s hosts..." % (now, len(hosts))
for url, response, status in pool.imap(scout_du, hosts):
if status == 200:
hostusage = []
for entry in response:
if entry['mounted']:
used = float(entry['used']) / float(entry['size']) * 100.0
hostusage.append(round(used, 2))
stats[url] = hostusage
for url in stats:
if len(stats[url]) > 0:
#get per host hi/los for another day
low = min(stats[url])
high = max(stats[url])
total = sum(stats[url])
average = total / len(stats[url])
highs.append(high)
lows.append(low)
averages.append(average)
for percent in stats[url]:
percents[percent] = percents.get(percent, 0) + 1
else:
print "-> %s: Error. No drive info available." % url
if len(lows) > 0:
low = min(lows)
high = max(highs)
average = sum(averages) / len(averages)
#distrib graph shamelessly stolen from https://github.com/gholt/tcod
print "Distribution Graph:"
mul = 69.0 / max(percents.values())
for percent in sorted(percents):
print '% 3d%% % 4d %s' % (percent, percents[percent], \
'*' * int(percents[percent] * mul))
print "Disk usage: lowest: %s%%, highest: %s%%, avg: %s%%" % \
(low, high, average)
else:
print "Error: No hosts where available or returned valid information."
print "=" * 79
def main():
global VERBOSE, SUPPRESS_ERRORS, swift_dir, pool
print "=" * 79
usage = '''
usage: %prog [-v] [--suppress] [-a] [-r] [-u] [-d] [-l] [-c] [--objmd5]
'''
args = optparse.OptionParser(usage)
args.add_option('--verbose', '-v', action="store_true",
help="Print verbose info")
args.add_option('--suppress', action="store_true",
help="Suppress most connection related errors")
args.add_option('--async', '-a', action="store_true",
help="Get async stats")
args.add_option('--replication', '-r', action="store_true",
help="Get replication stats")
args.add_option('--unmounted', '-u', action="store_true",
help="Check cluster for unmounted devices")
args.add_option('--diskusage', '-d', action="store_true",
help="Get disk usage stats")
args.add_option('--loadstats', '-l', action="store_true",
help="Get cluster load average stats")
args.add_option('--connstats', '-c', action="store_true",
help="Get connection stats")
args.add_option('--objmd5', action="store_true",
help="Get md5sums of object.ring.gz and compare to local copy")
args.add_option('--swiftdir', default="/etc/swift",
help="Default = /etc/swift")
options, arguments = args.parse_args()
swift_dir = options.swiftdir
VERBOSE = options.verbose
SUPPRESS_ERRORS = options.suppress
if options.async:
async_check()
if options.unmounted:
umount_check()
if options.replication:
replication_check()
if options.loadstats:
load_check()
if options.diskusage:
disk_usage()
if options.objmd5:
get_ringmd5(os.path.join(swift_dir, 'object.ring.gz'))
if __name__ == '__main__':
try:
main()
except KeyboardInterrupt:
print '\n'

46
bin/swift-recon-cron Executable file
View File

@ -0,0 +1,46 @@
#!/bin/bash
#ghetto temporary cronjob to pull some of the stats for swift-recon
#usage: swift-recon-cron /var/log/swift/storage.log
# run it as frequently as you like, will skip runs during periods
# of high async pendings when the find takes a while.
#todo: everything.
SYSLOG_FACILITY="local0.error"
ASYNC_PATH="/srv/node/sd[a-z]/async_pending/"
RECON_CACHE_PATH="/var/cache/swift/object.recon"
LOCKFILE="/var/lock/swift-recon-object.lock"
if [ -e $LOCKFILE ]; then
echo "NOTICE - $0 lock present - cron jobs overlapping ?"
echo "$0 lock file present" | /usr/bin/logger -p $SYSLOG_FACILITY
exit 1
else
touch $LOCKFILE
fi
if [ -z "$1" ]; then
LOGFILE="/var/log/swift/storage.log"
else
LOGFILE=$1
fi
if [ ! -r "$LOGFILE" ]; then
echo "$0: error $LOGFILE not readable" | /usr/bin/logger -p $SYSLOG_FACILITY
rm $LOCKFILE
exit 1
fi
TMPF=`/bin/mktemp`
asyncs=$(find $ASYNC_PATH -type f 2> /dev/null| wc -l)
#asyncs=$(find /srv/[1-4]/node/sd[a-z]1/async_pending/ -type f 2> /dev/null| wc -l) #saio
objrep=$(grep "Object replication complete." $LOGFILE | tail -n 1 | awk '{print $9}' | sed -e 's/(//g')
objincoming=$(netstat -aln | egrep "tcp.*:6000.*:.*ESTABLISHED" -c)
#objtw=$(netstat -aln | egrep "tcp.*:6000.*:.*TIME_WAIT" -c)
echo "{\"async_pending\":$asyncs, \"object_replication_time\":$objrep, \"object_established_conns\":$objincoming}" > $TMPF
mv $TMPF $RECON_CACHE_PATH
rm -f $TMPF $LOCKFILE
exit 0

View File

@ -0,0 +1,214 @@
# Copyright (c) 2010-2011 OpenStack, LLC.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from webob import Request, Response
from swift.common.utils import split_path, cache_from_env, get_logger
from swift.common.constraints import check_mount
from hashlib import md5
import simplejson as json
import os
class ReconMiddleware(object):
"""
Recon middleware used for monitoring.
/recon/load|mem|async... will return various system metrics.
"""
def __init__(self, app, conf, *args, **kwargs):
self.app = app
self.devices = conf.get('devices', '/srv/node/')
swift_dir = conf.get('swift_dir', '/etc/swift')
self.logger = get_logger(conf, log_route='recon')
self.recon_cache_path = conf.get('recon_cache_path', \
'/var/cache/swift')
self.object_recon_cache = "%s/object.recon" % self.recon_cache_path
self.account_ring_path = os.path.join(swift_dir, 'account.ring.gz')
self.container_ring_path = os.path.join(swift_dir, 'container.ring.gz')
self.object_ring_path = os.path.join(swift_dir, 'object.ring.gz')
self.rings = [self.account_ring_path, self.container_ring_path, \
self.object_ring_path]
self.mount_check = conf.get('mount_check', 'true').lower() in \
('true', 't', '1', 'on', 'yes', 'y')
def get_mounted(self):
"""get ALL mounted fs from /proc/mounts"""
mounts = []
with open('/proc/mounts', 'r') as procmounts:
for line in procmounts:
mount = {}
mount['device'], mount['path'], opt1, opt2, opt3, \
opt4 = line.rstrip().split()
mounts.append(mount)
return mounts
def get_load(self):
"""get info from /proc/loadavg"""
loadavg = {}
onemin, fivemin, ftmin, tasks, procs \
= open('/proc/loadavg', 'r').readline().rstrip().split()
loadavg['1m'] = float(onemin)
loadavg['5m'] = float(fivemin)
loadavg['15m'] = float(ftmin)
loadavg['tasks'] = tasks
loadavg['processes'] = int(procs)
return loadavg
def get_mem(self):
"""get info from /proc/meminfo"""
meminfo = {}
with open('/proc/meminfo', 'r') as memlines:
for i in memlines:
entry = i.rstrip().split(":")
meminfo[entry[0]] = entry[1].strip()
return meminfo
def get_async_info(self):
"""get # of async pendings"""
asyncinfo = {}
with open(self.object_recon_cache, 'r') as f:
recondata = json.load(f)
if 'async_pending' in recondata:
asyncinfo['async_pending'] = recondata['async_pending']
else:
self.logger.notice( \
_('NOTICE: Async pendings not in recon data.'))
asyncinfo['async_pending'] = -1
return asyncinfo
def get_replication_info(self):
"""grab last object replication time"""
repinfo = {}
with open(self.object_recon_cache, 'r') as f:
recondata = json.load(f)
if 'object_replication_time' in recondata:
repinfo['object_replication_time'] = \
recondata['object_replication_time']
else:
self.logger.notice( \
_('NOTICE: obj replication time not in recon data'))
repinfo['object_replication_time'] = -1
return repinfo
def get_device_info(self):
"""place holder, grab dev info"""
return self.devices
def get_unmounted(self):
"""list unmounted (failed?) devices"""
mountlist = []
for entry in os.listdir(self.devices):
mpoint = {'device': entry, \
"mounted": check_mount(self.devices, entry)}
if not mpoint['mounted']:
mountlist.append(mpoint)
return mountlist
def get_diskusage(self):
"""get disk utilization statistics"""
devices = []
for entry in os.listdir(self.devices):
if check_mount(self.devices, entry):
path = "%s/%s" % (self.devices, entry)
disk = os.statvfs(path)
capacity = disk.f_bsize * disk.f_blocks
available = disk.f_bsize * disk.f_bavail
used = disk.f_bsize * (disk.f_blocks - disk.f_bavail)
devices.append({'device': entry, 'mounted': True, \
'size': capacity, 'used': used, 'avail': available})
else:
devices.append({'device': entry, 'mounted': False, \
'size': '', 'used': '', 'avail': ''})
return devices
def get_ring_md5(self):
"""get all ring md5sum's"""
sums = {}
for ringfile in self.rings:
md5sum = md5()
with open(ringfile, 'rb') as f:
block = f.read(4096)
while block:
md5sum.update(block)
block = f.read(4096)
sums[ringfile] = md5sum.hexdigest()
return sums
def GET(self, req):
error = False
root, type = split_path(req.path, 1, 2, False)
try:
if type == "mem":
content = json.dumps(self.get_mem())
elif type == "load":
try:
content = json.dumps(self.get_load(), sort_keys=True)
except IOError as e:
error = True
content = "load - %s" % e
elif type == "async":
try:
content = json.dumps(self.get_async_info())
except IOError as e:
error = True
content = "async - %s" % e
elif type == "replication":
try:
content = json.dumps(self.get_replication_info())
except IOError as e:
error = True
content = "replication - %s" % e
elif type == "mounted":
content = json.dumps(self.get_mounted())
elif type == "unmounted":
content = json.dumps(self.get_unmounted())
elif type == "diskusage":
content = json.dumps(self.get_diskusage())
elif type == "ringmd5":
content = json.dumps(self.get_ring_md5())
else:
content = "Invalid path: %s" % req.path
return Response(request=req, status="400 Bad Request", \
body=content, content_type="text/plain")
except ValueError as e:
error = True
content = "ValueError: %s" % e
if not error:
return Response(request=req, body=content, \
content_type="application/json")
else:
msg = 'CRITICAL recon - %s' % str(content)
self.logger.critical(msg)
body = "Internal server error."
return Response(request=req, status="500 Server Error", \
body=body, content_type="text/plain")
def __call__(self, env, start_response):
req = Request(env)
if req.path.startswith('/recon/'):
return self.GET(req)(env, start_response)
else:
return self.app(env, start_response)
def filter_factory(global_conf, **local_conf):
conf = global_conf.copy()
conf.update(local_conf)
def recon_filter(app):
return ReconMiddleware(app, conf)
return recon_filter