Haproxy container for galera

We add haproxy side-cont to the galera pod. We bind galera to port 33306
and haproxy takes its place with 3306. Haproxy starts with non-working backend.
After it start, it checks for current leader in etcd, if there is none, it
set current leader to itself. After that it updates its backend to connect to
current leader.

It does polling of this logic each 5 sec and if leader is changed it will
update backend again.

Other nodes do the same, so, in the end, each node will be connected to the
same, single galera node.

Change-Id: Ieb611661857de1828259b28f54f5a0390b1dd196
This commit is contained in:
Proskurin Kirill 2016-12-27 17:38:09 +00:00
parent 613e47413a
commit a248697bc0
9 changed files with 371 additions and 4 deletions

View File

@ -0,0 +1,13 @@
FROM {{ image_spec("base-tools") }}
MAINTAINER {{ maintainer }}
COPY {{ render('sources.list.debian.j2') }} /etc/apt/sources.list.d/testing.list
COPY sudoers /etc/sudoers.d/haproxy_sudoers
RUN apt-get update \
&& apt-get install -y -t testing haproxy \
&& apt-get clean \
&& chown -R haproxy: /etc/haproxy /var/lib/haproxy \
&& usermod -a -G microservices haproxy
USER haproxy

View File

@ -0,0 +1,2 @@
# Testing repos
deb {{ url.debian }} testing main

View File

@ -0,0 +1 @@
haproxy ALL=(root) NOPASSWD: /bin/chown -R haproxy\: /run/haproxy, /bin/mkdir /run/haproxy

View File

@ -76,7 +76,7 @@ def get_etcd_client():
@retry
def get_mysql_client():
mysql_client = pymysql.connect(host='127.0.0.1',
port=3306,
port=33306,
user='monitor',
password=MONITOR_PASSWORD,
connect_timeout=1,

View File

@ -0,0 +1,33 @@
global
# No syslog in containers
#log /dev/log local0
stats socket /run/haproxy/admin.sock mode 660 level admin
stats timeout 30s
# Tunes from MOS
tune.bufsize 32768
tune.maxrewrite 1024
# Default SSL material locations
ca-base /etc/ssl/certs
crt-base /etc/ssl/private
ssl-default-bind-ciphers ECDH+AESGCM:DH+AESGCM:ECDH+AES256:DH+AES256:ECDH+AES128:DH+AES:RSA+AESGCM:RSA+AES:!aNULL:!MD5:!DSS
ssl-default-bind-options no-sslv3
defaults
log global
mode tcp
option tcplog
option logasap
option dontlognull
option mysql-check
option tcpka
timeout connect 10s
timeout client 28801s
timeout server 28801s
listen galera-cluster
bind 0.0.0.0:{{ percona.port.cont }}
# We start with non-working configuration and update it via admin socket in the runtime
server primary 127.0.0.1:11111 check

View File

@ -0,0 +1,291 @@
#!/usr/bin/env python
import argparse
import functools
import json
import logging
import os
import socket
import subprocess
import sys
import time
import etcd
HOSTNAME = socket.getfqdn()
IPADDR = socket.gethostbyname(HOSTNAME)
BACKEND_NAME = "galera-cluster"
SERVER_NAME = "primary"
GLOBALS_PATH = '/etc/ccp/globals/globals.json'
LOG_DATEFMT = "%Y-%m-%d %H:%M:%S"
LOG_FORMAT = "%(asctime)s.%(msecs)03d - %(levelname)s - %(message)s"
logging.basicConfig(format=LOG_FORMAT, datefmt=LOG_DATEFMT)
LOG = logging.getLogger(__name__)
LOG.setLevel(logging.DEBUG)
CONNECTION_ATTEMPTS = None
CONNECTION_DELAY = None
ETCD_PATH = None
ETCD_HOST = None
ETCD_PORT = None
# Haproxy constant for health checks
SRV_STATE_RUNNING = 2
SRV_CHK_RES_PASSED = 3
def retry(f):
@functools.wraps(f)
def wrap(*args, **kwargs):
attempts = CONNECTION_ATTEMPTS
delay = CONNECTION_DELAY
while attempts > 1:
try:
return f(*args, **kwargs)
except etcd.EtcdException as e:
LOG.warning('Etcd is not ready: %s', str(e))
LOG.warning('Retrying in %d seconds...', delay)
time.sleep(delay)
attempts -= 1
return f(*args, **kwargs)
return wrap
def get_config():
LOG.info("Getting global variables from %s", GLOBALS_PATH)
variables = {}
with open(GLOBALS_PATH) as f:
global_conf = json.load(f)
for key in ['percona', 'etcd', 'namespace']:
variables[key] = global_conf[key]
LOG.debug(variables)
return variables
def set_globals():
config = get_config()
global CONNECTION_ATTEMPTS, CONNECTION_DELAY
global ETCD_PATH, ETCD_HOST, ETCD_PORT
CONNECTION_ATTEMPTS = config['etcd']['connection_attempts']
CONNECTION_DELAY = config['etcd']['connection_delay']
ETCD_PATH = "/galera/%s" % config['percona']['cluster_name']
ETCD_HOST = "etcd.%s" % config['namespace']
ETCD_PORT = int(config['etcd']['client_port']['cont'])
def get_etcd_client():
return etcd.Client(host=ETCD_HOST,
port=ETCD_PORT,
allow_reconnect=True,
read_timeout=2)
def get_socket():
unix_socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
unix_socket.settimeout(5)
unix_socket.connect('/var/run/haproxy/admin.sock')
return unix_socket
def run_haproxy():
cmd = ["haproxy", "-f", "/etc/haproxy/haproxy.conf"]
LOG.info("Executing cmd:\n%s", cmd)
proc = subprocess.Popen(cmd)
return proc
def check_haproxy(proc):
ret_code = proc.poll()
if ret_code is not None:
LOG.error("Haproxy was terminated, exit code was: %s",
proc.returncode)
sys.exit(proc.returncode)
@retry
def etcd_set(etcd_client, key, value, ttl, dir=False, append=False, **kwargs):
etcd_client.write(key, value, ttl, dir, append, **kwargs)
LOG.info("Set %s with value '%s'", key, value)
@retry
def etcd_refresh(etcd_client, path, ttl):
key = os.path.join(ETCD_PATH, path)
etcd_client.refresh(key, ttl)
LOG.info("Refreshed %s ttl. New ttl is '%s'", key, ttl)
def send_command(cmd):
LOG.debug("Sending '%s' cmd to haproxy", cmd)
sock = get_socket()
sock.send(cmd + '\n')
file_handle = sock.makefile()
data = file_handle.read().splitlines()
sock.close()
return data
def get_haproxy_status():
state_data = send_command("show servers state galera-cluster")
stat_data = send_command("show stat typed")
# we need to parse string which looks like this:
# 'S.2.1.73.addr.1:CGS:str:10.233.76.104:33306'
for line in stat_data:
if "addr" in line:
ip, port = line.split(':')[-2:]
# It returns as a 3 elements list, with string inside.
# We have to do some magic, to make a valid dict out of it.
keys = state_data[1].split(' ')
keys.pop(0)
values = state_data[2].split(' ')
data_dict = dict(zip(keys, values))
data_dict['backend'] = "%s:%s" % (ip, port)
return data_dict
def get_cluster_state(etcd_client):
key = os.path.join(ETCD_PATH, 'state')
try:
state = etcd_client.read(key).value
return state
except etcd.EtcdKeyNotFound:
return None
def wait_for_cluster_to_be_steady(etcd_client, haproxy_proc):
while True:
state = get_cluster_state(etcd_client)
if state != 'STEADY':
check_haproxy(haproxy_proc)
LOG.warning("Cluster is not in the STEADY state, waiting...")
time.sleep(5)
else:
break
def set_server_addr(leader_ip):
cmds = ["set server %s/%s addr %s port 33306" % (
BACKEND_NAME, SERVER_NAME, leader_ip),
"set server %s/%s check-port 33306" % (
BACKEND_NAME, SERVER_NAME)]
for cmd in cmds:
# Bug in haproxy. Sometimes, haproxy can't convert port str to int.
# Will be fixed in 1.7.2
while True:
response = send_command(cmd)
if "problem converting port" in response[0]:
LOG.error("Port convertation failed, trying again...")
time.sleep(1)
else:
LOG.info("Successfuly set backend to %s:33306", leader_ip)
return
def get_leader(etcd_client):
key = os.path.join(ETCD_PATH, 'leader')
try:
leader = etcd_client.read(key).value
except etcd.EtcdKeyNotFound:
leader = None
LOG.info("Current leader is: %s", leader)
return leader
def set_leader(etcd_client, ttl, **kwargs):
key = os.path.join(ETCD_PATH, 'leader')
etcd_set(etcd_client, key, IPADDR, ttl, **kwargs)
def refresh_leader(etcd_client, ttl):
key = os.path.join(ETCD_PATH, 'leader')
etcd_refresh(etcd_client, key, ttl)
def do_we_need_to_reconfigure_haproxy(leader):
haproxy_stat = get_haproxy_status()
haproxy_leader = haproxy_stat['backend']
leader += ":33306"
LOG.debug("Haproxy server is: %s. Current leader is: %s",
haproxy_leader, leader)
return haproxy_leader != leader
def run_daemon(ttl):
LOG.debug("My IP is: %s", IPADDR)
haproxy_proc = run_haproxy()
etcd_client = get_etcd_client()
while True:
wait_for_cluster_to_be_steady(etcd_client, haproxy_proc)
leader = get_leader(etcd_client)
if not leader:
set_leader(etcd_client, ttl, prevExist=False)
leader = IPADDR
elif leader == IPADDR:
refresh_leader(etcd_client, ttl)
if do_we_need_to_reconfigure_haproxy(leader):
LOG.info("Updating haproxy configuration")
set_server_addr(leader)
check_haproxy(haproxy_proc)
LOG.info("Sleeping for 5 sec...")
time.sleep(5)
def run_readiness():
etcd_client = get_etcd_client()
state = get_cluster_state(etcd_client)
if state != 'STEADY':
LOG.error("Cluster is not in the STEADY state")
sys.exit(1)
leader = get_leader(etcd_client)
if not leader:
LOG.error("No leader found")
sys.exit(1)
else:
if do_we_need_to_reconfigure_haproxy(leader):
LOG.error("Haproxy configuration is wrong")
sys.exit(1)
haproxy_stat = get_haproxy_status()
LOG.debug(haproxy_stat)
if (int(haproxy_stat['srv_op_state']) != SRV_STATE_RUNNING and
int(haproxy_stat['srv_check_result']) != SRV_CHK_RES_PASSED):
LOG.error("Current leader is not alive")
sys.exit(1)
LOG.info("Service is ready")
sys.exit(0)
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument('type', choices=['daemon', 'readiness'])
args = parser.parse_args()
get_config()
set_globals()
if args.type == 'daemon':
run_daemon(ttl=20)
elif args.type == 'readiness':
run_readiness()
# vim: set ts=4 sw=4 tw=0 et :

View File

@ -1,6 +1,6 @@
[mysqld]
bind-address = 0.0.0.0
port = {{ percona.port.cont }}
port = 33306
datadir = /var/lib/mysql
pid-file = /var/lib/mysql/mysqld.pid

View File

@ -255,11 +255,14 @@ def etcd_set_seqno(etcd_client, ttl):
_etcd_set(etcd_client, key, seqno, ttl)
def etcd_deregister_in_path(etcd_client, path):
def etcd_deregister_in_path(etcd_client, path, prevValue=False):
key = os.path.join(ETCD_PATH, path, IPADDR)
try:
etcd_client.delete(key, recursive=True)
if prevValue:
etcd_client.delete(key, prevValue=prevValue)
else:
etcd_client.delete(key, recursive=True)
LOG.warning("Deleted key %s", key)
except etcd.EtcdKeyNotFound:
LOG.warning("Key %s not exist", key)
@ -632,6 +635,7 @@ def main(ttl):
etcd_deregister_in_path(etcd_client, 'queue')
etcd_deregister_in_path(etcd_client, 'nodes')
etcd_deregister_in_path(etcd_client, 'seqno')
etcd_deregister_in_path(etcd_client, 'leader', prevValue=IPADDR)
release_lock(lock)

View File

@ -18,6 +18,22 @@ service:
dependencies:
- etcd
command: "/opt/ccp/bin/galera_checker.py liveness"
- name: galera-haproxy
image: galera-haproxy
probes:
readiness: "/opt/ccp/bin/haproxy_entrypoint.py readiness"
pre:
- name: mkdir-run
command: "sudo /bin/mkdir /run/haproxy"
- name: chown-run
command: "sudo /bin/chown -R haproxy: /run/haproxy"
daemon:
files:
- haproxy-conf
- haproxy_entrypoint
dependencies:
- etcd
command: "/opt/ccp/bin/haproxy_entrypoint.py daemon"
- name: galera
image: percona
probes:
@ -63,3 +79,10 @@ files:
path: /opt/ccp/bin/galera_checker.py
content: galera_checker.py
perm: "0755"
haproxy-conf:
path: /etc/haproxy/haproxy.conf
content: haproxy.conf.j2
haproxy_entrypoint:
path: /opt/ccp/bin/haproxy_entrypoint.py
content: haproxy_entrypoint.py
perm: "0755"