Entrypoint rewrite in Python

Change-Id: I1d4d607b65bf0e8d52c4efd5e670e346049adde5
This commit is contained in:
Proskurin Kirill 2016-11-22 20:30:02 +00:00
parent 802dc48db0
commit a0c461b2bf
6 changed files with 632 additions and 427 deletions

View File

@ -1,113 +0,0 @@
#!/bin/bash
#
# Script to make a proxy (ie HAProxy) capable of monitoring Percona XtraDB
# Cluster nodes properly
#
# Authors:
# Raghavendra Prabhu <raghavendra.prabhu@percona.com>
# Olaf van Zandwijk <olaf.vanzandwijk@nedap.com>
#
# Based on the original script from Unai Rodriguez and Olaf
# (https://github.com/olafz/percona-clustercheck)
#
# Grant privileges required:
# GRANT PROCESS ON *.* TO 'clustercheckuser'@'localhost' IDENTIFIED BY
# 'clustercheckpassword!';
set -e
# Forward logs to docker log collector
exec 1>/proc/1/fd/2 2>/proc/1/fd/2
if [[ $1 == '-h' || $1 == '--help' ]];then
echo "Usage: $0 <available_when_donor=0|1> <log_file> <available_when_readonly=0|1> <defaults_extra_file>"
exit
fi
MYSQL_USERNAME=monitor
MYSQL_PASSWORD={{ percona.monitor_password }}
DISCOVERY_SERVICE={{ address("etcd", etcd.client_port) }}
CLUSTER_NAME={{ percona.cluster_name }}
AVAILABLE_WHEN_DONOR=${1:-0}
AVAILABLE_WHEN_READONLY=${2:-1}
DEFAULTS_EXTRA_FILE=${3:-/etc/my.cnf}
CURL="curl -sS"
FIRST_RUN=1
# CLUSTER_NAME to be set in enviroment
# DISCOVERY_SERVICE to be set in enviroment
#Timeout exists for instances where mysqld may be hung
TIMEOUT=10
EXTRA_ARGS=""
if [[ -n "$MYSQL_USERNAME" ]]; then
EXTRA_ARGS="$EXTRA_ARGS --user=${MYSQL_USERNAME}"
fi
if [[ -n "$MYSQL_PASSWORD" ]]; then
EXTRA_ARGS="$EXTRA_ARGS --password=${MYSQL_PASSWORD}"
fi
if [[ -r $DEFAULTS_EXTRA_FILE ]];then
MYSQL_CMDLINE="mysql --defaults-extra-file=$DEFAULTS_EXTRA_FILE -nNE --connect-timeout=$TIMEOUT \
${EXTRA_ARGS}"
else
MYSQL_CMDLINE="mysql -nNE --connect-timeout=$TIMEOUT ${EXTRA_ARGS}"
fi
ipaddr=$(hostname -i | awk ' { print $1 } ')
hostname=$(hostname)
while true
do
if [ $FIRST_RUN -eq 1 ]; then
sleep 30
FIRST_RUN=0
fi
#
# Perform the query to check the wsrep_local_state
#
# Race cond, we need to wait 'till mysql is ready, kek
WSREP_STATUS=($($MYSQL_CMDLINE -e "SHOW GLOBAL STATUS LIKE 'wsrep_%';" \
| grep -A 1 -E 'wsrep_local_state$|wsrep_cluster_status$' \
| sed -n -e '2p' -e '5p' | tr '\n' ' '))
if [[ ${WSREP_STATUS[1]} == 'Primary' && ( ${WSREP_STATUS[0]} -eq 4 || \
( ${WSREP_STATUS[0]} -eq 2 && $AVAILABLE_WHEN_DONOR -eq 1 ) ) ]]
then
# Check only when set to 0 to avoid latency in response.
if [[ $AVAILABLE_WHEN_READONLY -eq 0 ]];then
READ_ONLY=$($MYSQL_CMDLINE -e "SHOW GLOBAL VARIABLES LIKE 'read_only';")
if [[ "${READ_ONLY}" == "ON" ]];then
# Percona XtraDB Cluster node local state is 'Synced', but it is in
# read-only mode. The variable AVAILABLE_WHEN_READONLY is set to 0.
# => return HTTP 503
# Shell return-code is 1
date
echo "Read-only node. Destroying"
$CURL http://$DISCOVERY_SERVICE/v2/keys/pxc-cluster/$CLUSTER_NAME/$ipaddr/?recursive=true -XDELETE
fi
fi
# Percona XtraDB Cluster node local state is 'Synced' => return HTTP 200
# Shell return-code is 0
date
echo "Node is fine. Updating TTL"
$CURL http://$DISCOVERY_SERVICE/v2/keys/pxc-cluster/$CLUSTER_NAME/$ipaddr/ipaddr -XPUT -d value="$ipaddr" -d ttl=30
$CURL http://$DISCOVERY_SERVICE/v2/keys/pxc-cluster/$CLUSTER_NAME/$ipaddr/hostname -XPUT -d value="$hostname" -d ttl=30
$CURL http://$DISCOVERY_SERVICE/v2/keys/pxc-cluster/$CLUSTER_NAME/$ipaddr -XPUT -d ttl=30 -d dir=true -d prevExist=true
else
# Percona XtraDB Cluster node local state is not 'Synced' => return HTTP
# 503
# Shell return-code is 1
date
echo "Node state is not Synced. Destroying."
$CURL http://$DISCOVERY_SERVICE/v2/keys/pxc-cluster/$CLUSTER_NAME/$ipaddr/?recursive=true -XDELETE
fi
sleep 5
done

View File

@ -1,123 +0,0 @@
#!/usr/bin/env python
import functools
import logging
import os
import socket
import sys
import time
import etcd
CONNECTION_ATTEMPTS = 3
CONNECTION_DELAY = 5
LOG_DATEFMT = "%Y-%m-%d %H:%M:%S"
LOG_FORMAT = "%(asctime)s.%(msecs)03d - %(levelname)s - %(message)s"
logging.basicConfig(format=LOG_FORMAT, datefmt=LOG_DATEFMT)
LOG = logging.getLogger(__name__)
LOG.setLevel(logging.DEBUG)
ETCD_PATH = "/pxc-cluster/{{ percona.cluster_name }}"
HOSTNAME = socket.getfqdn()
IPADDR = socket.gethostbyname(HOSTNAME)
def retry(f):
@functools.wraps(f)
def wrap(*args, **kwargs):
attempts = CONNECTION_ATTEMPTS
delay = CONNECTION_DELAY
while attempts > 1:
try:
return f(*args, **kwargs)
except etcd.EtcdException as e:
LOG.warning('Etcd is not ready: %s', str(e))
LOG.warning('Retrying in %d seconds...', delay)
time.sleep(delay)
attempts -= 1
return f(*args, **kwargs)
return wrap
def get_etcd_client():
return etcd.Client(host="{{ address("etcd") }}",
port={{ etcd.client_port.cont }},
allow_reconnect=True,
read_timeout=2)
@retry
def fetch_status(etcd_client):
key = ETCD_PATH
result = [str(i.key).replace(key + "/", '')
for i in etcd_client.read(key).leaves
if str(i.key) != key]
LOG.info("Current cluster state is: %s", result)
return result
def _etcd_set(etcd_client, data, ttl):
key = os.path.join(ETCD_PATH, IPADDR, data[0])
etcd_client.set(key, data[1], ttl=ttl)
LOG.info("Set %s with value '%s'", key, data[1])
def _etcd_create_dir(etcd_client, ttl):
key = os.path.join(ETCD_PATH, IPADDR)
try:
etcd_client.get(key)
LOG.warning("Found stale key '%s', deleting", key)
etcd_client.delete(key, recursive=True, dir=True)
etcd_client.write(os.path.join(ETCD_PATH, IPADDR), None, ttl=ttl,
dir=True)
LOG.info("Set ttl for '%s' directory to %s", key, ttl)
except etcd.EtcdKeyNotFound:
etcd_client.write(os.path.join(ETCD_PATH, IPADDR), None, ttl=ttl,
dir=True)
LOG.info("Set ttl for '%s' directory to %s", key, ttl)
@retry
def set_status(etcd_client, ttl=30):
etcd_client = get_etcd_client()
_etcd_create_dir(etcd_client, ttl)
_etcd_set(etcd_client, ('ctime', time.time()), ttl)
_etcd_set(etcd_client, ('ipaddr', IPADDR), ttl)
_etcd_set(etcd_client, ('hostname', HOSTNAME), ttl)
def create_join_list(status):
status.remove(IPADDR)
if not status:
return ""
else:
return ','.join(status)
def main(ttl):
try:
etcd_client = get_etcd_client()
lock = etcd.Lock(etcd_client, 'galera_bootstrap')
LOG.info("Locking...")
lock.acquire(blocking=True, lock_ttl=ttl)
LOG.info("Successfuly acquired lock")
set_status(etcd_client, ttl)
status = fetch_status(etcd_client)
# This output will be stdout == data
print(create_join_list(status))
except Exception as err:
LOG.exception(err)
finally:
lock.release()
LOG.info("Successfuly released lock")
if __name__ == "__main__":
main(ttl=60)

View File

@ -0,0 +1,628 @@
#!/usr/bin/env python
import functools
import json
import logging
import os
import os.path
import shutil
import socket
import subprocess
import six.moves
import sys
import time
import etcd
import pymysql.cursors
HOSTNAME = socket.getfqdn()
IPADDR = socket.gethostbyname(HOSTNAME)
DATADIR = "/var/lib/mysql"
INIT_FILE = os.path.join(DATADIR, 'init.ok')
PID_FILE = os.path.join(DATADIR, "mysqld.pid")
GRASTATE_FILE = os.path.join(DATADIR, 'grastate.dat')
GLOBALS_PATH = '/etc/ccp/globals/globals.json'
EXPECTED_NODES = 3
LOG_DATEFMT = "%Y-%m-%d %H:%M:%S"
LOG_FORMAT = "%(asctime)s.%(msecs)03d - %(levelname)s - %(message)s"
logging.basicConfig(format=LOG_FORMAT, datefmt=LOG_DATEFMT)
LOG = logging.getLogger(__name__)
LOG.setLevel(logging.DEBUG)
MYSQL_ROOT_PASSWORD = None
CLUSTER_NAME = None
XTRABACKUP_PASSWORD = None
MONITOR_PASSWORD = None
CONNECTION_ATTEMPTS = None
CONNECTION_DELAY = None
ETCD_PATH = None
ETCD_HOST = None
ETCD_PORT = None
class ProcessException(Exception):
def __init__(self, exit_code):
self.exit_code = exit_code
self.msg = "Command exited with code %d" % self.exit_code
super(ProcessException, self).__init__(self.msg)
def retry(f):
@functools.wraps(f)
def wrap(*args, **kwargs):
attempts = CONNECTION_ATTEMPTS
delay = CONNECTION_DELAY
while attempts > 1:
try:
return f(*args, **kwargs)
except etcd.EtcdException as e:
LOG.warning('Etcd is not ready: %s', str(e))
LOG.warning('Retrying in %d seconds...', delay)
time.sleep(delay)
attempts -= 1
return f(*args, **kwargs)
return wrap
def get_config():
LOG.info("Getting global variables from %s", GLOBALS_PATH)
variables = {}
with open(GLOBALS_PATH) as f:
global_conf = json.load(f)
for key in ['percona', 'db', 'etcd', 'namespace']:
variables[key] = global_conf[key]
LOG.debug(variables)
return variables
def set_globals():
config = get_config()
global MYSQL_ROOT_PASSWORD, CLUSTER_NAME, XTRABACKUP_PASSWORD
global MONITOR_PASSWORD, CONNECTION_ATTEMPTS, CONNECTION_DELAY
global ETCD_PATH, ETCD_HOST, ETCD_PORT
MYSQL_ROOT_PASSWORD = config['db']['root_password']
CLUSTER_NAME = config['percona']['cluster_name']
XTRABACKUP_PASSWORD = config['percona']['xtrabackup_password']
MONITOR_PASSWORD = config['percona']['monitor_password']
CONNECTION_ATTEMPTS = config['etcd']['connection_attempts']
CONNECTION_DELAY = config['etcd']['connection_delay']
ETCD_PATH = "/galera/%s" % config['percona']['cluster_name']
ETCD_HOST = "etcd.%s" % config['namespace']
ETCD_PORT = int(config['etcd']['client_port']['cont'])
def get_mysql_client(insecure=False):
password = '' if insecure else MYSQL_ROOT_PASSWORD
return pymysql.connect(unix_socket='/var/run/mysqld/mysqld.sock',
user='root',
password=password,
connect_timeout=1,
read_timeout=1,
cursorclass=pymysql.cursors.DictCursor)
def get_etcd_client():
return etcd.Client(host=ETCD_HOST,
port=ETCD_PORT,
allow_reconnect=True,
read_timeout=2)
def datadir_cleanup(path):
for filename in os.listdir(path):
fullpath = os.path.join(path, filename)
if os.path.isdir(fullpath):
shutil.rmtree(fullpath)
else:
os.remove(fullpath)
def create_init_flag():
if not os.path.isfile(INIT_FILE):
open(INIT_FILE, 'a').close()
LOG.debug("Create init_ok file: %s", INIT_FILE)
else:
LOG.debug("Init file: '%s' already exists", INIT_FILE)
def run_cmd(cmd, check_result=False):
LOG.debug("Executing cmd:\n%s", cmd)
proc = subprocess.Popen(cmd, shell=True)
if check_result:
proc.communicate()
if proc.returncode != 0:
raise ProcessException(proc.returncode)
return proc
def run_mysqld(available_nodes):
cmd = ("mysqld --user=mysql --wsrep_cluster_name='%s'"
" --wsrep_cluster_address='gcomm://%s'"
" --wsrep_sst_method=xtrabackup-v2"
" --wsrep_sst_auth='xtrabackup:%s'"
" --wsrep_node_address='%s'"
" --pxc_strict_mode=PERMISSIVE" %
(CLUSTER_NAME, available_nodes,
XTRABACKUP_PASSWORD, IPADDR))
mysqld_proc = run_cmd(cmd)
wait_for_mysqld_to_start(mysqld_proc, insecure=False)
return mysqld_proc
def mysql_exec(mysql_client, sql_list):
with mysql_client.cursor() as cursor:
for cmd, args in sql_list:
LOG.debug("Executing mysql cmd: %s\nWith the following args: '%s'",
cmd, args)
cursor.execute(cmd, args)
return cursor.fetchall()
@retry
def fetch_status(etcd_client, path):
key = os.path.join(ETCD_PATH, path)
try:
root = etcd_client.get(key)
result = [str(child.key).replace(key + "/", '')
for child in root.children
if str(child.key) != key]
LOG.debug("Current nodes in %s is: %s", key, result)
return result
except etcd.EtcdKeyNotFound:
LOG.debug("Current nodes in %s is: %s", key, None)
return []
def fetch_wsrep_data():
wsrep_data = {}
mysql_client = get_mysql_client()
data = mysql_exec(mysql_client, [("SHOW STATUS LIKE 'wsrep%'", None)])
for i in data:
wsrep_data[i['Variable_name']] = i['Value']
return wsrep_data
@retry
def get_oldest_node_by_seqno(etcd_client, path):
"""
This fucntion returns IP addr of the node with the highes seqno.
seqno(sequence number) indicates the number of transactions ran thought
that node. Node with highes seqno is the node with the lates data.
"""
key = os.path.join(ETCD_PATH, path)
root = etcd_client.get(key)
# We need to cut etcd path prefix like "/galera/k8scluster/seqno/" to get
# the IP addr of the node.
prefix = key + "/"
result = [(str(child.key).replace(prefix, ''), int(child.value))
for child in root.children]
result.sort(key=lambda x: x[1])
LOG.debug("ALL seqno is %s", result)
LOG.info("Oldest node is %s, am %s", result[-1][0], IPADDR)
return result[-1][0]
@retry
def _etcd_set(etcd_client, path, value, ttl):
key = os.path.join(ETCD_PATH, path)
etcd_client.set(key, value, ttl=ttl)
LOG.info("Set %s with value '%s'", key, value)
def etcd_register_in_path(etcd_client, path, ttl=60):
key = os.path.join(path, IPADDR)
_etcd_set(etcd_client, key, time.time(), ttl)
def etcd_set_seqno(etcd_client, ttl):
seqno = mysql_get_seqno()
key = os.path.join('seqno', IPADDR)
_etcd_set(etcd_client, key, seqno, ttl)
def etcd_deregister_in_path(etcd_client, path):
key = os.path.join(ETCD_PATH, path, IPADDR)
try:
etcd_client.delete(key, recursive=True)
LOG.warning("Deleted key %s", key)
except etcd.EtcdKeyNotFound:
LOG.warning("Key %s not exist", key)
def mysql_get_seqno():
if os.path.isfile(GRASTATE_FILE):
with open(GRASTATE_FILE) as f:
content = f.readlines()
for line in content:
if line.startswith('seqno'):
return line.partition(':')[2].strip()
else:
LOG.warning("Can't find a '%s' file. Setting seqno to '-1'",
GRASTATE_FILE)
return -1
def get_cluster_state(etcd_client):
key = os.path.join(ETCD_PATH, 'state')
try:
state = etcd_client.read(key).value
return state
except etcd.EtcdKeyNotFound:
return None
def check_for_stale_seqno(etcd_client):
queue_set = set(fetch_status(etcd_client, 'queue'))
seqno_set = set(fetch_status(etcd_client, 'seqno'))
difference = queue_set - seqno_set
if difference:
LOG.warning("Found stale seqno entries: %s, deleting", difference)
for ip in difference:
key = os.path.join(ETCD_PATH, 'seqno', ip)
try:
etcd_client.delete(key)
LOG.warning("Deleted key %s", key)
except etcd.EtcdKeyNotFound:
LOG.warning("Key %s not exist", key)
else:
LOG.debug("Found seqno set is equals to the queue set: %s = %s",
(queue_set, seqno_set))
def wait_for_expected_state(etcd_client, ttl):
while True:
status = fetch_status(etcd_client, 'queue')
if len(status) > EXPECTED_NODES:
LOG.debug("Current number of nodes is %s, expected: %s, sleeping",
len(status), EXPECTED_NODES)
time.sleep(10)
elif len(status) < EXPECTED_NODES:
LOG.debug("Current number of nodes is %s, expected: %s, sleeping",
len(status), EXPECTED_NODES)
time.sleep(1)
else:
wait_for_my_turn(etcd_client)
break
def wait_for_my_turn(etcd_client):
check_for_stale_seqno(etcd_client)
LOG.info("Waiting for my turn to join cluster")
while True:
oldest_node = get_oldest_node_by_seqno(etcd_client, 'seqno')
if IPADDR == oldest_node:
LOG.info("It's my turn to join the cluster")
return
else:
time.sleep(5)
def wait_for_sync(mysqld):
while True:
try:
wsrep_data = fetch_wsrep_data()
state = int(wsrep_data['wsrep_local_state'])
if state == 4:
LOG.info("Node synced")
# If sync was done by SST all files in datadir was lost
create_init_flag()
break
else:
LOG.debug("Waiting node to be synced. Current state is: %s",
wsrep_data['wsrep_local_state_comment'])
time.sleep(5)
except Exception:
if mysqld.poll() is None:
time.sleep(5)
else:
LOG.error('Mysqld was terminated, exit code was: %s',
mysqld.returncode)
sys.exit(mysqld.returncode)
def check_if_im_last(etcd_client):
sleep = 10
queue_status = fetch_status(etcd_client, 'queue')
while True:
nodes_status = fetch_status(etcd_client, 'nodes')
if len(nodes_status) > EXPECTED_NODES:
LOG.info("Looks like we have stale data in etcd, found %s nodes, "
"but expected to find %s, sleeping for %s sec",
len(nodes_status), EXPECTED_NODES, sleep)
time.sleep(sleep)
else:
break
if not queue_status and len(nodes_status) == EXPECTED_NODES:
LOG.info("Looks like this node is the last one")
return True
else:
LOG.info("I'm not the last node")
return False
def create_join_list(status):
if IPADDR in status:
status.remove(IPADDR)
if not status:
LOG.info("No available nodes found. Assuming I'm first")
return ("", True)
else:
LOG.info("Joining to nodes %s", ','.join(status))
return (','.join(status), False)
def update_uuid(etcd_client):
wsrep_data = fetch_wsrep_data()
uuid = wsrep_data['wsrep_cluster_state_uuid']
_etcd_set(etcd_client, 'uuid', uuid, ttl=None)
def update_cluster_state(etcd_client, state):
_etcd_set(etcd_client, 'state', state, ttl=None)
def wait_for_mysqld(proc):
code = proc.wait()
LOG.info("Process exited with code %d", code)
sys.exit(code)
def wait_for_mysqld_to_start(proc, insecure):
LOG.info("Waiting mysql to start...")
for i in range(0, 29):
try:
mysql_client = get_mysql_client(insecure=insecure)
mysql_exec(mysql_client, [("SELECT 1", None)])
return
except Exception:
time.sleep(1)
else:
LOG.info("Mysql boot failed")
raise RuntimeError("Process exited with code: %s" % proc.returncode)
def wait_for_mysqld_to_stop():
"""
Since mysqld start wrapper first, we can't check for the executed proc
exit code and be assured that mysqld itself is finished working. We have
to check whole process group, so we're going to use pgrep for this.
"""
LOG.info("Waiting for mysqld to finish working")
for i in range(0, 29):
proc = run_cmd("pgrep mysqld")
proc.communicate()
if proc.returncode == 0:
time.sleep(1)
else:
LOG.info("Mysqld finished working")
break
else:
LOG.info("Can't kill the mysqld process used for bootstraping")
sys.exit(1)
def mysql_init():
datadir_cleanup(DATADIR)
run_cmd("mysqld --initialize-insecure", check_result=True)
mysqld_proc = run_cmd("mysqld --skip-networking")
wait_for_mysqld_to_start(mysqld_proc, insecure=True)
LOG.info("Mysql is running, setting up the permissions")
sql_list = [("CREATE USER 'root'@'%%' IDENTIFIED BY %s",
six.moves.shlex_quote(MYSQL_ROOT_PASSWORD)),
("GRANT ALL ON *.* TO 'root'@'%' WITH GRANT OPTION", None),
("ALTER USER 'root'@'localhost' IDENTIFIED BY %s",
six.moves.shlex_quote(MYSQL_ROOT_PASSWORD)),
("CREATE USER 'xtrabackup'@'localhost' IDENTIFIED BY %s",
six.moves.shlex_quote(XTRABACKUP_PASSWORD)),
("GRANT RELOAD,PROCESS,LOCK TABLES,REPLICATION CLIENT ON *.*"
" TO 'xtrabackup'@'localhost'", None),
("GRANT REPLICATION CLIENT ON *.* TO monitor@'%%' IDENTIFIED"
" BY %s", six.moves.shlex_quote(MONITOR_PASSWORD)),
("DROP DATABASE IF EXISTS test", None),
("FLUSH PRIVILEGES", None)]
try:
mysql_client = get_mysql_client(insecure=True)
mysql_exec(mysql_client, sql_list)
except Exception:
raise
create_init_flag()
# It's more safe to kill mysqld via pkill, since mysqld start wrapper first
run_cmd("pkill mysqld")
wait_for_mysqld_to_stop()
LOG.info("Mysql bootstraping is done")
def check_cluster(etcd_client):
state = get_cluster_state(etcd_client)
nodes_status = fetch_status(etcd_client, 'nodes')
if not nodes_status and state == 'STEADY':
LOG.warning("Cluster is in the STEADY state, but there no"
" alive nodes detected, running cluster recovery")
update_cluster_state(etcd_client, 'RECOVERY')
def acquire_lock(lock, ttl):
LOG.info("Locking...")
lock.acquire(blocking=True, lock_ttl=ttl)
LOG.info("Successfuly acquired lock")
def release_lock(lock):
lock.release()
LOG.info("Successfuly released lock")
def run_create_queue(etcd_client, lock, ttl):
"""
In this step we're making recovery preparations.
We need to get our seqno from mysql, after that we done, we'll fall into
the endless loop waiting 'till other nodes do the same and after that we
wait for our turn, based on the seqno, to start jointing the cluster.
"""
LOG.info("Creating recovery queue")
etcd_register_in_path(etcd_client, 'queue')
etcd_set_seqno(etcd_client, ttl=None)
release_lock(lock)
wait_for_expected_state(etcd_client, ttl)
def run_join_cluster(etcd_client, lock, ttl):
"""
In this step we're ready to join or create new cluster.
We get current nodes list, and it's empty it means we're the first one.
If the seqno queue list is empty and nodes list is equals to 3, we assume
that we're the last one. In the one last case we're the second one.
If we're the first one, we're creating the new cluster.
If we're the second one or last one, we're joinning to the existing
cluster.
If cluster state was a RECOVERY we do the same thing, but nodes take turns
not by first come - first served rule, but by the seqno of their data, so
first one node will the one with the most recent data.
"""
LOG.info("Joining the cluster")
acquire_lock(lock, ttl)
state = get_cluster_state(etcd_client)
nodes_status = fetch_status(etcd_client, 'nodes')
available_nodes, first_one = create_join_list(nodes_status)
mysqld = run_mysqld(available_nodes)
wait_for_sync(mysqld)
etcd_register_in_path(etcd_client, 'nodes', ttl)
etcd_deregister_in_path(etcd_client, 'queue')
if state == "RECOVERY":
etcd_deregister_in_path(etcd_client, 'seqno')
last_one = check_if_im_last(etcd_client)
release_lock(lock)
return (first_one, last_one, mysqld)
def run_update_metadata(etcd_client, first_one, last_one):
"""
In this step we updating the cluster state and metadata.
If node was the first one, it change the state of the cluster to the
BUILDING and sets it's uuid as a cluster uuid in etcd.
If node was the last one it change the state of the cluster to the STEADY.
Please note, that if it was a RECOVERY scenario, we dont change state of
the cluster until it will be fully rebuilded.
"""
LOG.info("Update cluster metadata")
state = get_cluster_state(etcd_client)
if first_one:
update_uuid(etcd_client)
if state != 'RECOVERY':
update_cluster_state(etcd_client, 'BUILDING')
if last_one:
update_cluster_state(etcd_client, 'STEADY')
def main(ttl):
if not os.path.isfile(INIT_FILE):
LOG.info("Init file '%s' not found, doing full init", INIT_FILE)
mysql_init()
else:
LOG.info("Init file '%s' found. Skiping mysql bootstrap and run"
" wsrep-recover", INIT_FILE)
run_cmd("mysqld_safe --wsrep-recover", check_result=True)
try:
LOG.debug("My IP is: %s", IPADDR)
etcd_client = get_etcd_client()
lock = etcd.Lock(etcd_client, 'galera')
acquire_lock(lock, ttl)
check_cluster(etcd_client)
state = get_cluster_state(etcd_client)
# Scenario 1: Initial bootstrap
if state is None or state == 'BUILDING':
LOG.info("No running cluster detected - starting bootstrap")
first_one, last_one, mysqld = run_join_cluster(etcd_client, lock,
ttl)
run_update_metadata(etcd_client, first_one, last_one)
LOG.info("Bootsraping is done. Node is ready.")
# Scenario 2: Re-connect
elif state == 'STEADY':
LOG.info("Detected running cluster, re-connecting")
first_one, last_one, mysqld = run_join_cluster(etcd_client, lock,
ttl)
LOG.info("Node joined and ready")
# Scenario 3: Recovery
elif state == 'RECOVERY':
LOG.warning("Cluster is in the RECOVERY state, re-connecting to"
" the node with the oldest data")
run_create_queue(etcd_client, lock, ttl)
first_one, last_one, mysqld = run_join_cluster(etcd_client, lock,
ttl)
run_update_metadata(etcd_client, first_one, last_one)
etcd_deregister_in_path(etcd_client, 'seqno')
LOG.info("Recovery is done. Node is ready.")
wait_for_mysqld(mysqld)
except Exception:
raise
finally:
etcd_deregister_in_path(etcd_client, 'queue')
etcd_deregister_in_path(etcd_client, 'nodes')
etcd_deregister_in_path(etcd_client, 'seqno')
release_lock(lock)
if __name__ == "__main__":
get_config()
set_globals()
main(ttl=300)
# vim: set ts=4 sw=4 tw=0 et :

View File

@ -1,128 +0,0 @@
#!/bin/bash
set -ex
# Forward logs to docker log collector
exec 1>/proc/1/fd/2 2>/proc/1/fd/2
MYSQL_ROOT_PASSWORD={{ db.root_password }}
CLUSTER_NAME={{ percona.cluster_name }}
XTRABACKUP_PASSWORD={{ percona.xtrabackup_password }}
MONITOR_PASSWORD={{ percona.monitor_password }}
CURL="curl -sS"
# if command starts with an option, prepend mysqld
if [ "${1:0:1}" = '-' ]; then
CMDARG="$@"
fi
if [ -z "$CLUSTER_NAME" ]; then
echo >&2 'Error: You need to specify CLUSTER_NAME'
exit 1
fi
# Get config
DATADIR="$("mysqld" --verbose --wsrep_provider= --help 2>/dev/null | awk '$1 == "datadir" { print $2; exit }')"
if [ ! -e "$DATADIR/init.ok" ]; then
if [ -z "$MYSQL_ROOT_PASSWORD" -a -z "$MYSQL_ALLOW_EMPTY_PASSWORD" -a -z "$MYSQL_RANDOM_ROOT_PASSWORD" ]; then
echo >&2 'error: database is uninitialized and password option is not specified '
echo >&2 ' You need to specify one of MYSQL_ROOT_PASSWORD, MYSQL_ALLOW_EMPTY_PASSWORD and MYSQL_RANDOM_ROOT_PASSWORD'
exit 1
fi
rm -rf $DATADIR/*
mkdir -p "$DATADIR"
ls -la "$DATADIR"/
echo "Running --initialize-insecure on $DATADIR"
mysqld --initialize-insecure
echo 'Finished --initialize-insecure'
mysqld --user=mysql --datadir="$DATADIR" --skip-networking &
pid="$!"
mysql=( mysql --protocol=socket -uroot )
for i in {30..0}; do
if echo 'SELECT 1' | "${mysql[@]}" &> /dev/null; then
break
fi
echo 'MySQL init process in progress...'
sleep 1
done
if [ "$i" = 0 ]; then
echo >&2 'MySQL init process failed.'
exit 1
fi
# sed is for https://bugs.mysql.com/bug.php?id=20545
mysql_tzinfo_to_sql /usr/share/zoneinfo | sed 's/Local time zone must be set--see zic manual page/FCTY/' | "${mysql[@]}" mysql
if [ ! -z "$MYSQL_RANDOM_ROOT_PASSWORD" ]; then
MYSQL_ROOT_PASSWORD="$(pwmake 128)"
echo "GENERATED ROOT PASSWORD: $MYSQL_ROOT_PASSWORD"
fi
"${mysql[@]}" <<-EOSQL
-- What's done in this file shouldn't be replicated
-- or products like mysql-fabric won't work
SET @@SESSION.SQL_LOG_BIN=0;
CREATE USER 'root'@'%' IDENTIFIED BY '${MYSQL_ROOT_PASSWORD}' ;
GRANT ALL ON *.* TO 'root'@'%' WITH GRANT OPTION ;
ALTER USER 'root'@'localhost' IDENTIFIED BY '${MYSQL_ROOT_PASSWORD}';
CREATE USER 'xtrabackup'@'localhost' IDENTIFIED BY '$XTRABACKUP_PASSWORD';
GRANT RELOAD,PROCESS,LOCK TABLES,REPLICATION CLIENT ON *.* TO 'xtrabackup'@'localhost';
GRANT REPLICATION CLIENT ON *.* TO monitor@'%' IDENTIFIED BY '$MONITOR_PASSWORD';
DROP DATABASE IF EXISTS test ;
FLUSH PRIVILEGES ;
EOSQL
if [ ! -z "$MYSQL_ROOT_PASSWORD" ]; then
mysql+=( -p"${MYSQL_ROOT_PASSWORD}" )
fi
if [ "$MYSQL_DATABASE" ]; then
echo "CREATE DATABASE IF NOT EXISTS \`$MYSQL_DATABASE\` ;" | "${mysql[@]}"
mysql+=( "$MYSQL_DATABASE" )
fi
if [ "$MYSQL_USER" -a "$MYSQL_PASSWORD" ]; then
echo "CREATE USER '"$MYSQL_USER"'@'%' IDENTIFIED BY '"$MYSQL_PASSWORD"' ;" | "${mysql[@]}"
if [ "$MYSQL_DATABASE" ]; then
echo "GRANT ALL ON \`"$MYSQL_DATABASE"\`.* TO '"$MYSQL_USER"'@'%' ;" | "${mysql[@]}"
fi
echo 'FLUSH PRIVILEGES ;' | "${mysql[@]}"
fi
if [ ! -z "$MYSQL_ONETIME_PASSWORD" ]; then
"${mysql[@]}" <<-EOSQL
ALTER USER 'root'@'%' PASSWORD EXPIRE;
EOSQL
fi
if ! kill -s TERM "$pid" || ! wait "$pid"; then
echo >&2 'MySQL init process failed.'
exit 1
fi
echo
echo 'MySQL init process done. Ready for start up.'
echo
fi
touch $DATADIR/init.ok
available_nodes=$(/opt/ccp/bin/etcd_register.py)
if [ -z "$available_nodes" ]; then
echo "No available nodes found. Assuming Im first"
else
echo "Joining to nodes: $available_nodes"
fi
bash /opt/ccp/bin/clustercheckcron 1 1 &
mysqld --user=mysql --wsrep_cluster_name=$CLUSTER_NAME \
--wsrep_cluster_address="gcomm://$available_nodes" \
--wsrep_sst_method=xtrabackup-v2 \
--wsrep_sst_auth="xtrabackup:$XTRABACKUP_PASSWORD" \
--wsrep_node_address="$ipaddr" \
--pxc_strict_mode=PERMISSIVE \
$CMDARG
# vim: set ts=4 sw=4 tw=0 et :

View File

@ -1,44 +0,0 @@
#!/usr/bin/env python
import sys
import pymysql.cursors
connection = pymysql.connect(host='127.0.0.1',
port=3306,
user='monitor',
password='{{ percona.monitor_password }}',
connect_timeout=1,
read_timeout=1,
cursorclass=pymysql.cursors.DictCursor)
def fetch_data(connection):
try:
with connection.cursor() as cursor:
sql = "SHOW STATUS LIKE 'wsrep%'"
cursor.execute(sql)
data = cursor.fetchall()
return data
except Exception:
sys.exit(1)
def check_galera(connection):
results = {}
data = fetch_data(connection)
for i in data:
results[i['Variable_name']] = i['Value']
if (results["wsrep_local_state_comment"] != "Synced" or
results["wsrep_evs_state"] != "OPERATIONAL" or
results["wsrep_connected"] != "ON" or
results["wsrep_ready"] != "ON"):
sys.exit(1)
else:
sys.exit(0)
check_galera(connection)

View File

@ -8,7 +8,7 @@ service:
- name: galera
image: percona
probes:
readiness: "/opt/ccp/bin/percona_readiness.py"
readiness: "true"
liveness:
command: "true"
type: "exec"
@ -30,30 +30,15 @@ service:
files:
- entrypoint
- mycnf
- check
- readiness
- galera-etcd-register
dependencies:
- etcd
command: /opt/ccp/bin/entrypoint.sh
command: /opt/ccp/bin/entrypoint.py
files:
entrypoint:
path: /opt/ccp/bin/entrypoint.sh
content: percona_entrypoint.sh.j2
path: /opt/ccp/bin/entrypoint.py
content: percona_entrypoint.py
perm: "0755"
mycnf:
path: /etc/mysql/my.cnf
content: my.cnf.j2
check:
path: /opt/ccp/bin/clustercheckcron
content: clustercheckcron.j2
perm: "0755"
readiness:
path: /opt/ccp/bin/percona_readiness.py
content: percona_readiness.py.j2
perm: "0750"
galera-etcd-register:
path: /opt/ccp/bin/etcd_register.py
content: etcd_register.py.j2
perm: "0755"