Cassandra Support
1. devstack script 2. cassandra driver It is the first step on support of Cassandra. The driver is implemented by CQL statements. Change-Id: If2f9d37b1410ceaa09de2555397f1a21d2a1db4b Implements: blueprint cassandra-support
This commit is contained in:
parent
013b7e34fb
commit
f7b600f710
|
@ -0,0 +1,171 @@
|
|||
#!/bin/bash
|
||||
#
|
||||
#
|
||||
# ``plugin.sh`` calls the following methods in the sourced driver:
|
||||
#
|
||||
# - nb_db_driver_install_server
|
||||
# - nb_db_driver_install_client
|
||||
# - nb_db_driver_start_server
|
||||
# - nb_db_driver_stop_server
|
||||
# - nb_db_driver_clean
|
||||
# - nb_db_driver_configure
|
||||
|
||||
HOSTNAME=`hostname -f`
|
||||
|
||||
if is_ubuntu ; then
|
||||
UBUNTU_RELEASE_BASE_NUM=`lsb_release -r | awk '{print $2}' | cut -d '.' -f 1`
|
||||
fi
|
||||
|
||||
CASSANDRA_HOME="/etc/cassandra"
|
||||
CASSANDRA_DATA_HOME="/var/lib/cassandra"
|
||||
CASSANDRA_DEB_SOURCE_FILE="/etc/apt/sources.list.d/cassandra.list"
|
||||
CASSANDRA_RPM_SOURCE_FILE="/etc/yum.repos.d/cassandra.repo"
|
||||
|
||||
CASSANDRA_DEFAULT_KEYSPACE="openstack"
|
||||
# By default, the cassandra uses one replication for the all-in-one deployment
|
||||
CASSANDRA_DEFAULT_REPLICATION=1
|
||||
CASSANDRA_DEFAULT_CONSISTENCY_LEVEL="one"
|
||||
|
||||
# Cassandra service startup/cleanup duration
|
||||
CASSANDRA_SERVICE_CHECK_REPLAY=5
|
||||
|
||||
# The seeds of cassandra (the cassandra hosts to form a cluster) should
|
||||
# be specified in the configuration file. In order to generate the ip list
|
||||
# of the cluster, string manipulation is needed here to get the right
|
||||
# format of the seeds.
|
||||
CASSANDRA_CLUSTER=$REMOTE_DB_HOSTS
|
||||
CASSANDRA_NUM_OF_HOSTS_IN_CLUSTER=${CASSANDRA_NUM_OF_HOSTS:-1}
|
||||
CASSANDRA_TEMP_FILE="/tmp/cassandra_hosts"
|
||||
echo $CASSANDRA_CLUSTER > $CASSANDRA_TEMP_FILE
|
||||
IPS=''
|
||||
for ((i=1;i<=$CASSANDRA_NUM_OF_HOSTS_IN_CLUSTER;i++))
|
||||
do
|
||||
ip=`cut -d ',' -f $i < $CASSANDRA_TEMP_FILE | cut -d ':' -f 1`
|
||||
IPS=$IPS','$ip
|
||||
done
|
||||
CASSANDRA_CLUSTER_IPS=${IPS#*","}
|
||||
rm $CASSANDRA_TEMP_FILE
|
||||
# End
|
||||
|
||||
if is_ubuntu; then
|
||||
CASSANDRA_CONF_DIR="$CASSANDRA_HOME"
|
||||
elif is_fedora; then
|
||||
CASSANDRA_CONF_DIR="$CASSANDRA_HOME/conf"
|
||||
else
|
||||
die $LINENO "Other distributions are not supported"
|
||||
fi
|
||||
CASSANDRA_CONF_FILE="$CASSANDRA_CONF_DIR/cassandra.yaml"
|
||||
|
||||
function _cassandra_create_keyspace {
|
||||
keyspace="CREATE KEYSPACE IF NOT EXISTS $CASSANDRA_DEFAULT_KEYSPACE "
|
||||
replica="WITH REPLICATION = { 'class' : 'SimpleStrategy', 'replication_factor' : $CASSANDRA_DEFAULT_REPLICATION } "
|
||||
durable="AND DURABLE_WRITES = true;"
|
||||
cqlsh $HOST_IP -e "$keyspace$replica$durable"
|
||||
}
|
||||
|
||||
function _cassandra_drop_keyspace {
|
||||
cqlsh $HOST_IP -e "DROP KEYSPACE IF EXISTS $CASSANDRA_DEFAULT_KEYSPACE;"
|
||||
}
|
||||
|
||||
function nb_db_driver_install_server {
|
||||
if is_service_enabled df-cassandra-server ; then
|
||||
echo "Installing Cassandra server"
|
||||
if is_ubuntu; then
|
||||
sudo tee -a $CASSANDRA_DEB_SOURCE_FILE >/dev/null <<'EOF'
|
||||
deb http://debian.datastax.com/datastax-ddc 3.9 main
|
||||
EOF
|
||||
curl -L https://debian.datastax.com/debian/repo_key | sudo apt-key add -
|
||||
sudo apt-get update -y
|
||||
install_package openjdk-8-jre-headless
|
||||
elif is_fedora; then
|
||||
sudo tee -a $CASSANDRA_RPM_SOURCE_FILE >/dev/null <<'EOF'
|
||||
[datastax-ddc]
|
||||
name = DataStax Repo for Apache Cassandra
|
||||
baseurl = http://rpm.datastax.com/datastax-ddc/3.9
|
||||
enabled = 1
|
||||
gpgcheck = 0
|
||||
EOF
|
||||
sudo yum update -y
|
||||
install_package java-1.8.0-openjdk-headless
|
||||
fi
|
||||
|
||||
install_package datastax-ddc
|
||||
echo "Configuring Cassandra"
|
||||
sudo sed -i "s/127.0.0.1/${CASSANDRA_CLUSTER_IPS}/g" $CASSANDRA_CONF_FILE
|
||||
sudo sed -i "/^listen_address:/c listen_address: ${HOST_IP}" $CASSANDRA_CONF_FILE
|
||||
sudo sed -i "/^rpc_address:/c rpc_address:" $CASSANDRA_CONF_FILE
|
||||
sudo sed -i "/^broadcast_address:/c broadcast_address:" $CASSANDRA_CONF_FILE
|
||||
# change ownership for data directory
|
||||
sudo chown -R cassandra:cassandra $CASSANDRA_DATA_HOME
|
||||
# start cassandra service
|
||||
nb_db_driver_start_server
|
||||
# initialize keyspace
|
||||
_cassandra_create_keyspace
|
||||
fi
|
||||
}
|
||||
|
||||
function nb_db_driver_install_client {
|
||||
echo 'Cassandra client sdk is in the requirements file.'
|
||||
}
|
||||
|
||||
function nb_db_driver_status_server
|
||||
{
|
||||
if is_service_enabled df-cassandra-server ; then
|
||||
TEMP_PIDS=`pgrep -f "cassandra"`
|
||||
if [ -z "$TEMP_PIDS" ]; then
|
||||
return 1
|
||||
fi
|
||||
fi
|
||||
return 0
|
||||
}
|
||||
|
||||
function _check_cassandra_status {
|
||||
times=0
|
||||
# Initially Cassandra needs long duration to startup/cleanup
|
||||
sleep 20
|
||||
|
||||
# Check the Cassandra cluster UP and Normal
|
||||
result=$(nodetool -h $HOST_IP status | grep $HOST_IP | grep 'UN' | wc -l)
|
||||
while [[ $result -lt 1 ]]
|
||||
do
|
||||
sleep 10
|
||||
result=$(nodetool -h $HOST_IP status | grep $HOST_IP | grep 'UN' | wc -l)
|
||||
times=`expr $times + 1`
|
||||
if [[ $times > $CASSANDRA_SERVICE_CHECK_REPLAY ]];
|
||||
then
|
||||
echo "Cassandra Restart Error!"
|
||||
return 1
|
||||
fi
|
||||
done
|
||||
return 0
|
||||
}
|
||||
|
||||
function nb_db_driver_start_server {
|
||||
if is_service_enabled df-cassandra-server ; then
|
||||
sudo /etc/init.d/cassandra restart
|
||||
_check_cassandra_status
|
||||
fi
|
||||
}
|
||||
|
||||
function nb_db_driver_stop_server {
|
||||
if is_service_enabled df-cassandra-server ; then
|
||||
sudo /etc/init.d/cassandra stop
|
||||
fi
|
||||
}
|
||||
|
||||
function nb_db_driver_clean {
|
||||
nb_db_driver_start_server
|
||||
_cassandra_drop_keyspace
|
||||
nb_db_driver_stop_server
|
||||
|
||||
if is_ubuntu || is_fedora; then
|
||||
uninstall_package -y datastax-ddc
|
||||
fi
|
||||
sudo rm -rf ${CASSANDRA_HOME}
|
||||
sudo rm -rf ${CASSANDRA_DATA_HOME}
|
||||
}
|
||||
|
||||
function nb_db_driver_configure {
|
||||
# set consistency level
|
||||
iniset $DRAGONFLOW_CONF df-cassandra consistency_level "$CASSANDRA_DEFAULT_CONSISTENCY_LEVEL"
|
||||
}
|
|
@ -8,6 +8,7 @@
|
|||
# - nb_db_driver_start_server
|
||||
# - nb_db_driver_stop_server
|
||||
# - nb_db_driver_clean
|
||||
# - nb_db_driver_configure
|
||||
|
||||
ETCD_VERSION=${ETCD_VERSION:-v3.0.15}
|
||||
OVERRIDE_FILE=$DEST/dragonflow/devstack/etcd.override
|
||||
|
@ -105,3 +106,7 @@ function nb_db_driver_clean {
|
|||
sudo rm /lib/systemd/system/etcd.service
|
||||
sudo rm /usr/local/bin/etcd
|
||||
}
|
||||
|
||||
function nb_db_driver_configure {
|
||||
:
|
||||
}
|
||||
|
|
|
@ -75,6 +75,11 @@ if is_service_enabled df-zookeeper ; then
|
|||
source $DEST/dragonflow/devstack/zookeeper_driver
|
||||
NB_DRIVER_CLASS="zookeeper_nb_db_driver"
|
||||
fi
|
||||
if is_service_enabled df-cassandra ; then
|
||||
is_df_db_driver_selected && die $LINENO "More than one database service is set for Dragonflow."
|
||||
source $DEST/dragonflow/devstack/cassandra_driver
|
||||
NB_DRIVER_CLASS="cassandra_nb_db_driver"
|
||||
fi
|
||||
if is_service_enabled df-redis ; then
|
||||
is_df_db_driver_selected && die $LINENO "More than one database service is set for Dragonflow."
|
||||
source $DEST/dragonflow/devstack/redis_driver
|
||||
|
@ -242,7 +247,6 @@ function configure_df_plugin {
|
|||
|
||||
iniset $DRAGONFLOW_CONF df enable_selective_topology_distribution \
|
||||
"$DF_SELECTIVE_TOPO_DIST"
|
||||
|
||||
configure_df_metadata_service
|
||||
}
|
||||
|
||||
|
@ -260,7 +264,6 @@ function install_zeromq {
|
|||
}
|
||||
|
||||
function install_df {
|
||||
|
||||
install_zeromq
|
||||
|
||||
if function_exists nb_db_driver_install_server; then
|
||||
|
@ -447,6 +450,10 @@ if [[ "$Q_ENABLE_DRAGONFLOW_LOCAL_CONTROLLER" == "True" ]]; then
|
|||
elif [[ "$1" == "stack" && "$2" == "post-config" ]]; then
|
||||
configure_ovs
|
||||
configure_df_plugin
|
||||
# configure nb db driver
|
||||
if function_exists nb_db_driver_configure; then
|
||||
nb_db_driver_configure
|
||||
fi
|
||||
# initialize the nb db
|
||||
init_nb_db
|
||||
|
||||
|
@ -475,7 +482,9 @@ if [[ "$Q_ENABLE_DRAGONFLOW_LOCAL_CONTROLLER" == "True" ]]; then
|
|||
if [[ "$1" == "unstack" ]]; then
|
||||
stop_df_metadata_agent
|
||||
stop_df
|
||||
if function_exists nb_db_driver_clean; then
|
||||
nb_db_driver_clean
|
||||
fi
|
||||
cleanup_ovs
|
||||
stop_ovs
|
||||
uninstall_ovs
|
||||
|
|
|
@ -8,6 +8,7 @@
|
|||
# - nb_db_driver_start_server
|
||||
# - nb_db_driver_stop_server
|
||||
# - nb_db_driver_clean
|
||||
# - nb_db_driver_configure
|
||||
|
||||
RAMCLOUD=$DEST/ramcloud
|
||||
RAMCLOUD_LIB=$RAMCLOUD/lib
|
||||
|
@ -78,3 +79,7 @@ function nb_db_driver_stop_server {
|
|||
function nb_db_driver_clean {
|
||||
sudo rm -rf $RAMCLOUD
|
||||
}
|
||||
|
||||
function nb_db_driver_configure {
|
||||
:
|
||||
}
|
||||
|
|
|
@ -8,6 +8,7 @@
|
|||
# - nb_db_driver_start_server
|
||||
# - nb_db_driver_stop_server
|
||||
# - nb_db_driver_clean
|
||||
# - nb_db_driver_configure
|
||||
|
||||
REDIS_VERSION=3.0.6
|
||||
RUBY_VERSION=2.3
|
||||
|
@ -165,3 +166,7 @@ function nb_db_driver_clean {
|
|||
fi
|
||||
fi
|
||||
}
|
||||
|
||||
function nb_db_driver_configure {
|
||||
:
|
||||
}
|
||||
|
|
|
@ -8,6 +8,7 @@
|
|||
# - nb_db_driver_start_server
|
||||
# - nb_db_driver_stop_server
|
||||
# - nb_db_driver_clean
|
||||
# - nb_db_driver_configure
|
||||
|
||||
HOSTNAME=`hostname -f`
|
||||
|
||||
|
@ -99,3 +100,7 @@ function nb_db_driver_stop_server {
|
|||
stop_service zookeeper
|
||||
fi
|
||||
}
|
||||
|
||||
function nb_db_driver_configure {
|
||||
:
|
||||
}
|
||||
|
|
|
@ -0,0 +1,48 @@
|
|||
[[local|localrc]]
|
||||
|
||||
LOGFILE=$DEST/logs/stack.sh.log
|
||||
|
||||
Q_ENABLE_DRAGONFLOW_LOCAL_CONTROLLER=True
|
||||
DF_SELECTIVE_TOPO_DIST=True
|
||||
DF_PUB_SUB=True
|
||||
ENABLE_PORT_STATUS_NOTIFIER=False
|
||||
|
||||
DATABASE_PASSWORD=password
|
||||
RABBIT_PASSWORD=password
|
||||
SERVICE_PASSWORD=password
|
||||
SERVICE_TOKEN=password
|
||||
ADMIN_PASSWORD=password
|
||||
|
||||
enable_plugin dragonflow http://git.openstack.org/openstack/dragonflow
|
||||
|
||||
disable_all_services
|
||||
enable_service n-cpu
|
||||
enable_service df-cassandra
|
||||
enable_service df-controller
|
||||
enable_service n-novnc
|
||||
|
||||
# Enable df-metadata (Dragonflow metadata service proxy) once nova is being used.
|
||||
enable_service df-metadata
|
||||
|
||||
# Set this to the address of the main DevStack host running the rest of the
|
||||
# OpenStack services. (Controller node)
|
||||
|
||||
SERVICE_HOST=<IP address of host running everything else>
|
||||
RABBIT_HOST=$SERVICE_HOST
|
||||
Q_HOST=$SERVICE_HOST
|
||||
|
||||
# Specify Cassandra server or cluster
|
||||
# When deploying Cassandra cluster, you can use ',' to specify multiple servers.
|
||||
REMOTE_DB_HOSTS=$SERVICE_HOST:9042
|
||||
CASSANDRA_NUM_OF_HOSTS=1
|
||||
|
||||
# Make VNC work on compute node
|
||||
NOVA_VNC_ENABLED=True
|
||||
NOVNCPROXY_URL=http://$SERVICE_HOST:6080/vnc_auto.html
|
||||
VNCSERVER_LISTEN=$HOST_IP
|
||||
VNCSERVER_PROXYCLIENT_ADDRESS=$VNCSERVER_LISTEN
|
||||
|
||||
[[post-config|$NEUTRON_CONF]]
|
||||
[df]
|
||||
enable_df_pub_sub = True
|
||||
pub_sub_driver = "zmq_pubsub_driver"
|
|
@ -0,0 +1,42 @@
|
|||
[[local|localrc]]
|
||||
|
||||
LOGFILE=$DEST/logs/stack.sh.log
|
||||
|
||||
Q_ENABLE_DRAGONFLOW_LOCAL_CONTROLLER=True
|
||||
DF_SELECTIVE_TOPO_DIST=True
|
||||
DF_PUB_SUB=True
|
||||
ENABLE_PORT_STATUS_NOTIFIER=False
|
||||
|
||||
DATABASE_PASSWORD=password
|
||||
RABBIT_PASSWORD=password
|
||||
SERVICE_PASSWORD=password
|
||||
SERVICE_TOKEN=password
|
||||
ADMIN_PASSWORD=password
|
||||
|
||||
enable_plugin dragonflow http://git.openstack.org/openstack/dragonflow
|
||||
enable_service df-cassandra
|
||||
enable_service df-cassandra-server
|
||||
enable_service df-controller
|
||||
|
||||
disable_service n-net
|
||||
enable_service q-svc
|
||||
enable_service df-l3-agent
|
||||
disable_service heat
|
||||
disable_service tempest
|
||||
|
||||
# Enable df-metadata (Dragonflow metadata service proxy) once nova is being used.
|
||||
enable_service df-metadata
|
||||
|
||||
# We have to disable the neutron L2 agent. DF does not use the L2 agent.
|
||||
disable_service q-agt
|
||||
|
||||
# We have to disable the neutron dhcp agent. DF does not use the dhcp agent.
|
||||
disable_service q-dhcp
|
||||
|
||||
# Specify Cassandra server or cluster
|
||||
# When deploying Cassandra cluster, you can use ',' to specify multiple servers.
|
||||
REMOTE_DB_HOSTS=$HOST_IP:9042
|
||||
CASSANDRA_NUM_OF_HOSTS=1
|
||||
|
||||
# The build-in PUB/SUB mechanism is mandatory for Zookeeper backend.
|
||||
enable_service df-zmq-publisher-service
|
|
@ -0,0 +1,45 @@
|
|||
[[local|localrc]]
|
||||
|
||||
LOGFILE=$DEST/logs/stack.sh.log
|
||||
|
||||
#OFFLINE=True
|
||||
#RECLONE=False
|
||||
|
||||
Q_ENABLE_DRAGONFLOW_LOCAL_CONTROLLER=True
|
||||
DF_SELECTIVE_TOPO_DIST=True
|
||||
DF_PUB_SUB=True
|
||||
ENABLE_PORT_STATUS_NOTIFIER=False
|
||||
|
||||
DATABASE_PASSWORD=password
|
||||
RABBIT_PASSWORD=password
|
||||
SERVICE_PASSWORD=password
|
||||
SERVICE_TOKEN=password
|
||||
ADMIN_PASSWORD=password
|
||||
|
||||
enable_plugin dragonflow http://git.openstack.org/openstack/dragonflow
|
||||
enable_service df-cassandra
|
||||
enable_service df-cassandra-server
|
||||
enable_service df-controller
|
||||
|
||||
disable_service n-net
|
||||
enable_service q-svc
|
||||
enable_service df-l3-agent
|
||||
disable_service heat
|
||||
disable_service tempest
|
||||
|
||||
# We have to disable the neutron L2 agent. DF does not use the L2 agent.
|
||||
disable_service q-agt
|
||||
|
||||
# We have to disable the neutron dhcp agent. DF does not use the dhcp agent.
|
||||
disable_service q-dhcp
|
||||
|
||||
# Enable df-metadata (Dragonflow metadata service proxy) once nova is being used.
|
||||
enable_service df-metadata
|
||||
|
||||
# Specify Cassandra server or cluster
|
||||
# When deploying Cassandra cluster, you can use ',' to specify multiple servers.
|
||||
REMOTE_DB_HOSTS=$HOST_IP:9042
|
||||
CASSANDRA_NUM_OF_HOSTS=1
|
||||
|
||||
# The build-in PUB/SUB mechanism is mandatory for Cassandra backend.
|
||||
enable_service df-zmq-publisher-service
|
|
@ -93,3 +93,7 @@ class DFMultipleExceptions(exceptions.MultipleExceptions):
|
|||
|
||||
class UnknownResourceException(DragonflowException):
|
||||
message = _('Could not find lock id for resource type %(resource_type)')
|
||||
|
||||
|
||||
class InvalidDBHostConfiguration(DragonflowException):
|
||||
message = _('The DB host string %(host)s is invalid.')
|
||||
|
|
|
@ -13,6 +13,7 @@
|
|||
from oslo_config import cfg
|
||||
|
||||
from dragonflow.conf import df_active_port_detection
|
||||
from dragonflow.conf import df_cassandra
|
||||
from dragonflow.conf import df_common_params
|
||||
from dragonflow.conf import df_dhcp
|
||||
from dragonflow.conf import df_dnat
|
||||
|
@ -24,6 +25,7 @@ from dragonflow.conf import df_ryu
|
|||
CONF = cfg.CONF
|
||||
|
||||
|
||||
df_cassandra.register_opts()
|
||||
df_common_params.register_opts()
|
||||
df_dhcp.register_opts()
|
||||
df_metadata_service.register_opts()
|
||||
|
|
|
@ -0,0 +1,61 @@
|
|||
# Copyright (c) 2016 OpenStack Foundation.
|
||||
# All Rights Reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
|
||||
from oslo_config import cfg
|
||||
|
||||
from dragonflow._i18n import _
|
||||
|
||||
|
||||
df_cassandra_opts = [
|
||||
cfg.StrOpt(
|
||||
'consistency_level',
|
||||
default='one',
|
||||
help=_('The default consistency level for Cassandra session.'
|
||||
'The value should be any, one, two, three, quorum, all,'
|
||||
'local_quorum, each_quorum, serial, local_serial, local_one.'),
|
||||
),
|
||||
cfg.StrOpt(
|
||||
'load_balancing',
|
||||
default='rr',
|
||||
help=_('The default load balancing policy for Cassandra cluster.'
|
||||
'The value should be rr, dc_rr, wl_rr, token_rr.'),
|
||||
),
|
||||
cfg.StrOpt(
|
||||
'local_dc_name',
|
||||
default='local',
|
||||
help=_('The DC name for dc_rr load balancing policy.'),
|
||||
),
|
||||
cfg.IntOpt(
|
||||
'used_hosts_per_remote_dc',
|
||||
default=0,
|
||||
help=_('The number of respected remote hosts for '
|
||||
'dc_rr load balancing policy.'),
|
||||
),
|
||||
cfg.StrOpt(
|
||||
'whitelist_hosts',
|
||||
default='localhost',
|
||||
help=_('The hosts to permit connections to for wl_rr load balancing '
|
||||
'policy. Please specify a list of hosts by comma.'),
|
||||
),
|
||||
]
|
||||
|
||||
|
||||
def register_opts():
|
||||
cfg.CONF.register_opts(df_cassandra_opts, group='df_cassandra')
|
||||
|
||||
|
||||
def list_opts():
|
||||
return {'df_cassandra': df_cassandra_opts}
|
|
@ -0,0 +1,211 @@
|
|||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
from cassandra import cluster
|
||||
from cassandra import policies
|
||||
from cassandra import query
|
||||
|
||||
from oslo_log import log
|
||||
|
||||
from dragonflow._i18n import _LE
|
||||
from dragonflow.common import exceptions as df_exceptions
|
||||
from dragonflow import conf as cfg
|
||||
from dragonflow.db import db_api
|
||||
|
||||
LOG = log.getLogger(__name__)
|
||||
|
||||
ROOT_KS = 'openstack'
|
||||
|
||||
CAS_TABLE = 'unique_key'
|
||||
|
||||
# NOTE(nick-ma-z): http://datastax.github.io/python-driver/
|
||||
# api/cassandra.html
|
||||
CONSISTENCY_MAPPING = {
|
||||
'any': query.ConsistencyLevel.ANY,
|
||||
'one': query.ConsistencyLevel.ONE,
|
||||
'two': query.ConsistencyLevel.TWO,
|
||||
'three': query.ConsistencyLevel.THREE,
|
||||
'quorum': query.ConsistencyLevel.QUORUM,
|
||||
'all': query.ConsistencyLevel.ALL,
|
||||
'local_quorum': query.ConsistencyLevel.LOCAL_QUORUM,
|
||||
'each_quorum': query.ConsistencyLevel.EACH_QUORUM,
|
||||
'serial': query.ConsistencyLevel.SERIAL,
|
||||
'local_serial': query.ConsistencyLevel.LOCAL_SERIAL,
|
||||
'local_one': query.ConsistencyLevel.LOCAL_ONE,
|
||||
}
|
||||
|
||||
|
||||
def _check_valid_host(host_str):
|
||||
return ':' in host_str and host_str[-1] != ':'
|
||||
|
||||
|
||||
def _parse_hosts(hosts):
|
||||
ips = []
|
||||
ports = []
|
||||
for host_str in hosts:
|
||||
if _check_valid_host(host_str):
|
||||
host_port = host_str.strip().split(':')
|
||||
ips.append(host_port[0])
|
||||
port = int(host_port[1])
|
||||
ports.append(port)
|
||||
if len(ports) > 0 and port not in ports:
|
||||
raise df_exceptions.InvalidDBHostConfiguration(host=host_str)
|
||||
else:
|
||||
LOG.error(_LE("The host string %s is invalid."), host_str)
|
||||
return (ips, ports[0])
|
||||
|
||||
|
||||
class CassandraDbDriver(db_api.DbApi):
|
||||
|
||||
def __init__(self):
|
||||
super(CassandraDbDriver, self).__init__()
|
||||
self.client = None
|
||||
self.config = cfg.CONF.df_cassandra
|
||||
|
||||
def _get_consistency_level(self, consistency_level):
|
||||
if consistency_level in CONSISTENCY_MAPPING:
|
||||
return CONSISTENCY_MAPPING[consistency_level]
|
||||
else:
|
||||
# by default
|
||||
return query.ConsistencyLevel.ONE
|
||||
|
||||
def _get_loadbalancing_policy(self, policy):
|
||||
# NOTE(nick-ma-z): http://datastax.github.io/python-driver/
|
||||
# api/cassandra/policies.html
|
||||
if policy == 'rr':
|
||||
return policies.RoundRobinPolicy()
|
||||
elif policy == 'dc_rr':
|
||||
return policies.DCAwareRoundRobinPolicy(
|
||||
cfg.CONF.df_cassandra.local_dc_name,
|
||||
cfg.CONF.df_cassandra.used_hosts_per_remote_dc)
|
||||
elif policy == 'wl_rr':
|
||||
return policies.WhiteListRoundRobinPolicy(
|
||||
cfg.CONF.df_cassandra.whitelist_hosts)
|
||||
elif policy == 'token_rr':
|
||||
return policies.TokenAwarePolicy(
|
||||
policies.RoundRobinPolicy())
|
||||
else:
|
||||
# by default
|
||||
return policies.RoundRobinPolicy()
|
||||
|
||||
def initialize(self, db_ip, db_port, **args):
|
||||
ips, default_port = _parse_hosts(args['config'].remote_db_hosts)
|
||||
lb_policy = self._get_loadbalancing_policy(
|
||||
self.config.load_balancing)
|
||||
consistency = self._get_consistency_level(
|
||||
self.config.consistency_level)
|
||||
|
||||
self.client = cluster.Cluster(ips, port=default_port,
|
||||
load_balancing_policy=lb_policy)
|
||||
self.session = self.client.connect(ROOT_KS)
|
||||
self.session.default_consistency_level = consistency
|
||||
self.session.row_factory = query.dict_factory
|
||||
|
||||
def support_publish_subscribe(self):
|
||||
return False
|
||||
|
||||
def create_table(self, table):
|
||||
self.session.execute("CREATE TABLE IF NOT EXISTS %s "
|
||||
"(key text PRIMARY KEY, value text);" % table)
|
||||
|
||||
def delete_table(self, table):
|
||||
self.session.execute("DROP TABLE %s;" % table)
|
||||
|
||||
def get_key(self, table, key, topic=None):
|
||||
try:
|
||||
rows = self.session.execute("SELECT value FROM %(table)s WHERE "
|
||||
"key='%(key)s';" % {'table': table,
|
||||
'key': key})
|
||||
return rows[0]['value']
|
||||
except Exception:
|
||||
raise df_exceptions.DBKeyNotFound(key=key)
|
||||
|
||||
def set_key(self, table, key, value, topic=None):
|
||||
self.session.execute("UPDATE %(table)s SET value='%(value)s' WHERE "
|
||||
"key='%(key)s';" % {'table': table,
|
||||
'key': key,
|
||||
'value': value})
|
||||
|
||||
def create_key(self, table, key, value, topic=None):
|
||||
self.session.execute("INSERT INTO %(table)s (key,value) VALUES "
|
||||
"('%(key)s','%(value)s') "
|
||||
"IF NOT EXISTS;" % {'table': table,
|
||||
'key': key,
|
||||
'value': value})
|
||||
|
||||
def delete_key(self, table, key, topic=None):
|
||||
try:
|
||||
self.session.execute("DELETE FROM %(table)s WHERE "
|
||||
"key='%(key)s';" % {'table': table,
|
||||
'key': key})
|
||||
except Exception:
|
||||
raise df_exceptions.DBKeyNotFound(key=key)
|
||||
|
||||
def get_all_entries(self, table, topic=None):
|
||||
res = []
|
||||
try:
|
||||
rows = self.session.execute("SELECT value FROM %s;" % table)
|
||||
except Exception:
|
||||
return res
|
||||
for entry in rows:
|
||||
if entry['value']:
|
||||
res.append(entry['value'])
|
||||
return res
|
||||
|
||||
def get_all_keys(self, table, topic=None):
|
||||
res = []
|
||||
try:
|
||||
rows = self.session.execute("SELECT key FROM %s;" % table)
|
||||
except Exception:
|
||||
raise df_exceptions.DBKeyNotFound(key=table)
|
||||
for entry in rows:
|
||||
res.append(entry['key'])
|
||||
return res
|
||||
|
||||
def _allocate_unique_key(self, table):
|
||||
orig_val = 0
|
||||
try:
|
||||
orig_val = int(self.get_key(CAS_TABLE, table))
|
||||
prev_val = str(orig_val)
|
||||
post_val = str(orig_val + 1)
|
||||
self.session.execute("UPDATE %(table)s SET value='%(post)s' "
|
||||
"WHERE key=%(key)s "
|
||||
"IF value='%(prev)s';" % {'table': CAS_TABLE,
|
||||
'post': post_val,
|
||||
'key': table,
|
||||
'prev': prev_val})
|
||||
return orig_val + 1
|
||||
except Exception:
|
||||
self.create_key(CAS_TABLE, table, "1")
|
||||
return 1
|
||||
|
||||
def allocate_unique_key(self, table):
|
||||
while True:
|
||||
try:
|
||||
return self._allocate_unique_key(table)
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
def register_notification_callback(self, callback):
|
||||
pass
|
||||
|
||||
def register_topic_for_notification(self, topic):
|
||||
pass
|
||||
|
||||
def unregister_topic_for_notification(self, topic):
|
||||
pass
|
||||
|
||||
def process_ha(self):
|
||||
pass
|
||||
|
||||
def set_neutron_server(self, is_neutron_server):
|
||||
pass
|
|
@ -5,6 +5,7 @@
|
|||
pbr>=1.8 # Apache-2.0
|
||||
Babel>=2.3.4 # BSD
|
||||
python-etcd>=0.4.3 # MIT License
|
||||
cassandra-driver>=2.1.4,!=3.6.0 # Apache-2.0
|
||||
kazoo>=2.2 # Apache-2.0
|
||||
ovs>=2.6.1 # Apache-2.0
|
||||
pyzmq>=14.3.1 # LGPL+BSD
|
||||
|
|
|
@ -69,6 +69,7 @@ dragonflow.nb_db_driver =
|
|||
ramcloud_nb_db_driver = dragonflow.db.drivers.ramcloud_db_driver:RamCloudDbDriver
|
||||
zookeeper_nb_db_driver = dragonflow.db.drivers.zookeeper_db_driver:ZookeeperDbDriver
|
||||
redis_nb_db_driver = dragonflow.db.drivers.redis_db_driver:RedisDbDriver
|
||||
cassandra_nb_db_driver = dragonflow.db.drivers.cassandra_db_driver:CassandraDbDriver
|
||||
dragonflow.port_status_driver =
|
||||
redis_port_status_notifier_driver = dragonflow.db.pubsub_drivers.redis_port_status_notifier:RedisPortStatusNotifier
|
||||
neutron.service_plugins =
|
||||
|
|
Loading…
Reference in New Issue