swift/swift/container/backend.py

900 lines
36 KiB
Python

# Copyright (c) 2010-2012 OpenStack Foundation
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Pluggable Back-ends for Container Server
"""
import os
from uuid import uuid4
import time
import cPickle as pickle
import errno
import sqlite3
from swift.common.utils import Timestamp, lock_parent_directory
from swift.common.db import DatabaseBroker, DatabaseConnectionError, \
PENDING_CAP, PICKLE_PROTOCOL, utf8encode
DATADIR = 'containers'
POLICY_STAT_TABLE_CREATE = '''
CREATE TABLE policy_stat (
storage_policy_index INTEGER PRIMARY KEY,
object_count INTEGER DEFAULT 0,
bytes_used INTEGER DEFAULT 0
);
'''
POLICY_STAT_TRIGGER_SCRIPT = '''
CREATE TRIGGER object_insert_policy_stat AFTER INSERT ON object
BEGIN
UPDATE policy_stat
SET object_count = object_count + (1 - new.deleted),
bytes_used = bytes_used + new.size
WHERE storage_policy_index = new.storage_policy_index;
INSERT INTO policy_stat (
storage_policy_index, object_count, bytes_used)
SELECT new.storage_policy_index,
(1 - new.deleted),
new.size
WHERE NOT EXISTS(
SELECT changes() as change
FROM policy_stat
WHERE change <> 0
);
UPDATE container_info
SET hash = chexor(hash, new.name, new.created_at);
END;
CREATE TRIGGER object_delete_policy_stat AFTER DELETE ON object
BEGIN
UPDATE policy_stat
SET object_count = object_count - (1 - old.deleted),
bytes_used = bytes_used - old.size
WHERE storage_policy_index = old.storage_policy_index;
UPDATE container_info
SET hash = chexor(hash, old.name, old.created_at);
END;
'''
CONTAINER_INFO_TABLE_SCRIPT = '''
CREATE TABLE container_info (
account TEXT,
container TEXT,
created_at TEXT,
put_timestamp TEXT DEFAULT '0',
delete_timestamp TEXT DEFAULT '0',
reported_put_timestamp TEXT DEFAULT '0',
reported_delete_timestamp TEXT DEFAULT '0',
reported_object_count INTEGER DEFAULT 0,
reported_bytes_used INTEGER DEFAULT 0,
hash TEXT default '00000000000000000000000000000000',
id TEXT,
status TEXT DEFAULT '',
status_changed_at TEXT DEFAULT '0',
metadata TEXT DEFAULT '',
x_container_sync_point1 INTEGER DEFAULT -1,
x_container_sync_point2 INTEGER DEFAULT -1,
storage_policy_index INTEGER DEFAULT 0,
reconciler_sync_point INTEGER DEFAULT -1
);
'''
CONTAINER_STAT_VIEW_SCRIPT = '''
CREATE VIEW container_stat
AS SELECT ci.account, ci.container, ci.created_at,
ci.put_timestamp, ci.delete_timestamp,
ci.reported_put_timestamp, ci.reported_delete_timestamp,
ci.reported_object_count, ci.reported_bytes_used, ci.hash,
ci.id, ci.status, ci.status_changed_at, ci.metadata,
ci.x_container_sync_point1, ci.x_container_sync_point2,
ci.reconciler_sync_point,
ci.storage_policy_index,
coalesce(ps.object_count, 0) AS object_count,
coalesce(ps.bytes_used, 0) AS bytes_used
FROM container_info ci LEFT JOIN policy_stat ps
ON ci.storage_policy_index = ps.storage_policy_index;
CREATE TRIGGER container_stat_update
INSTEAD OF UPDATE ON container_stat
BEGIN
UPDATE container_info
SET account = NEW.account,
container = NEW.container,
created_at = NEW.created_at,
put_timestamp = NEW.put_timestamp,
delete_timestamp = NEW.delete_timestamp,
reported_put_timestamp = NEW.reported_put_timestamp,
reported_delete_timestamp = NEW.reported_delete_timestamp,
reported_object_count = NEW.reported_object_count,
reported_bytes_used = NEW.reported_bytes_used,
hash = NEW.hash,
id = NEW.id,
status = NEW.status,
status_changed_at = NEW.status_changed_at,
metadata = NEW.metadata,
x_container_sync_point1 = NEW.x_container_sync_point1,
x_container_sync_point2 = NEW.x_container_sync_point2,
storage_policy_index = NEW.storage_policy_index,
reconciler_sync_point = NEW.reconciler_sync_point;
END;
'''
class ContainerBroker(DatabaseBroker):
"""Encapsulates working with a container database."""
db_type = 'container'
db_contains_type = 'object'
db_reclaim_timestamp = 'created_at'
@property
def storage_policy_index(self):
if not hasattr(self, '_storage_policy_index'):
self._storage_policy_index = \
self.get_info()['storage_policy_index']
return self._storage_policy_index
def _initialize(self, conn, put_timestamp, storage_policy_index):
"""
Create a brand new container database (tables, indices, triggers, etc.)
"""
if not self.account:
raise ValueError(
'Attempting to create a new database with no account set')
if not self.container:
raise ValueError(
'Attempting to create a new database with no container set')
self.create_object_table(conn)
self.create_policy_stat_table(conn, storage_policy_index)
self.create_container_info_table(conn, put_timestamp,
storage_policy_index)
def create_object_table(self, conn):
"""
Create the object table which is specific to the container DB.
Not a part of Pluggable Back-ends, internal to the baseline code.
:param conn: DB connection object
"""
conn.executescript("""
CREATE TABLE object (
ROWID INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT,
created_at TEXT,
size INTEGER,
content_type TEXT,
etag TEXT,
deleted INTEGER DEFAULT 0,
storage_policy_index INTEGER DEFAULT 0
);
CREATE INDEX ix_object_deleted_name ON object (deleted, name);
CREATE TRIGGER object_update BEFORE UPDATE ON object
BEGIN
SELECT RAISE(FAIL, 'UPDATE not allowed; DELETE and INSERT');
END;
""" + POLICY_STAT_TRIGGER_SCRIPT)
def create_container_info_table(self, conn, put_timestamp,
storage_policy_index):
"""
Create the container_info table which is specific to the container DB.
Not a part of Pluggable Back-ends, internal to the baseline code.
Also creates the container_stat view.
:param conn: DB connection object
:param put_timestamp: put timestamp
:param storage_policy_index: storage policy index
"""
if put_timestamp is None:
put_timestamp = Timestamp(0).internal
# The container_stat view is for compatibility; old versions of Swift
# expected a container_stat table with columns "object_count" and
# "bytes_used", but when that stuff became per-storage-policy and
# moved to the policy_stat table, we stopped creating those columns in
# container_stat.
#
# To retain compatibility, we create the container_stat view with some
# triggers to make it behave like the old container_stat table. This
# way, if an old version of Swift encounters a database with the new
# schema, it can still work.
#
# Note that this can occur during a rolling Swift upgrade if a DB gets
# rsynced from an old node to a new, so it's necessary for
# availability during upgrades. The fact that it enables downgrades is
# a nice bonus.
conn.executescript(CONTAINER_INFO_TABLE_SCRIPT +
CONTAINER_STAT_VIEW_SCRIPT)
conn.execute("""
INSERT INTO container_info (account, container, created_at, id,
put_timestamp, status_changed_at, storage_policy_index)
VALUES (?, ?, ?, ?, ?, ?, ?);
""", (self.account, self.container, Timestamp(time.time()).internal,
str(uuid4()), put_timestamp, put_timestamp,
storage_policy_index))
def create_policy_stat_table(self, conn, storage_policy_index=0):
"""
Create policy_stat table.
:param conn: DB connection object
:param storage_policy_index: the policy_index the container is
being created with
"""
conn.executescript(POLICY_STAT_TABLE_CREATE)
conn.execute("""
INSERT INTO policy_stat (storage_policy_index)
VALUES (?)
""", (storage_policy_index,))
def get_db_version(self, conn):
if self._db_version == -1:
self._db_version = 0
for row in conn.execute('''
SELECT name FROM sqlite_master
WHERE name = 'ix_object_deleted_name' '''):
self._db_version = 1
return self._db_version
def _newid(self, conn):
conn.execute('''
UPDATE container_stat
SET reported_put_timestamp = 0, reported_delete_timestamp = 0,
reported_object_count = 0, reported_bytes_used = 0''')
def _delete_db(self, conn, timestamp):
"""
Mark the DB as deleted
:param conn: DB connection object
:param timestamp: timestamp to mark as deleted
"""
conn.execute("""
UPDATE container_stat
SET delete_timestamp = ?,
status = 'DELETED',
status_changed_at = ?
WHERE delete_timestamp < ? """, (timestamp, timestamp, timestamp))
def _commit_puts_load(self, item_list, entry):
"""See :func:`swift.common.db.DatabaseBroker._commit_puts_load`"""
data = pickle.loads(entry.decode('base64'))
(name, timestamp, size, content_type, etag, deleted) = data[:6]
if len(data) > 6:
storage_policy_index = data[6]
else:
storage_policy_index = 0
item_list.append({'name': name,
'created_at': timestamp,
'size': size,
'content_type': content_type,
'etag': etag,
'deleted': deleted,
'storage_policy_index': storage_policy_index})
def empty(self):
"""
Check if container DB is empty.
:returns: True if the database has no active objects, False otherwise
"""
self._commit_puts_stale_ok()
with self.get() as conn:
try:
row = conn.execute(
'SELECT max(object_count) from policy_stat').fetchone()
except sqlite3.OperationalError as err:
if not any(msg in str(err) for msg in (
"no such column: storage_policy_index",
"no such table: policy_stat")):
raise
row = conn.execute(
'SELECT object_count from container_stat').fetchone()
return (row[0] == 0)
def delete_object(self, name, timestamp, storage_policy_index=0):
"""
Mark an object deleted.
:param name: object name to be deleted
:param timestamp: timestamp when the object was marked as deleted
"""
self.put_object(name, timestamp, 0, 'application/deleted', 'noetag',
deleted=1, storage_policy_index=storage_policy_index)
def put_object(self, name, timestamp, size, content_type, etag, deleted=0,
storage_policy_index=0):
"""
Creates an object in the DB with its metadata.
:param name: object name to be created
:param timestamp: timestamp of when the object was created
:param size: object size
:param content_type: object content-type
:param etag: object etag
:param deleted: if True, marks the object as deleted and sets the
deleted_at timestamp to timestamp
:param storage_policy_index: the storage policy index for the object
"""
record = {'name': name, 'created_at': timestamp, 'size': size,
'content_type': content_type, 'etag': etag,
'deleted': deleted,
'storage_policy_index': storage_policy_index}
if self.db_file == ':memory:':
self.merge_items([record])
return
if not os.path.exists(self.db_file):
raise DatabaseConnectionError(self.db_file, "DB doesn't exist")
pending_size = 0
try:
pending_size = os.path.getsize(self.pending_file)
except OSError as err:
if err.errno != errno.ENOENT:
raise
if pending_size > PENDING_CAP:
self._commit_puts([record])
else:
with lock_parent_directory(self.pending_file,
self.pending_timeout):
with open(self.pending_file, 'a+b') as fp:
# Colons aren't used in base64 encoding; so they are our
# delimiter
fp.write(':')
fp.write(pickle.dumps(
(name, timestamp, size, content_type, etag, deleted,
storage_policy_index),
protocol=PICKLE_PROTOCOL).encode('base64'))
fp.flush()
def _is_deleted_info(self, object_count, put_timestamp, delete_timestamp,
**kwargs):
"""
Apply delete logic to database info.
:returns: True if the DB is considered to be deleted, False otherwise
"""
# The container is considered deleted if the delete_timestamp
# value is greater than the put_timestamp, and there are no
# objects in the container.
return (object_count in (None, '', 0, '0')) and (
Timestamp(delete_timestamp) > Timestamp(put_timestamp))
def _is_deleted(self, conn):
"""
Check container_stat view and evaluate info.
:param conn: database conn
:returns: True if the DB is considered to be deleted, False otherwise
"""
info = conn.execute('''
SELECT put_timestamp, delete_timestamp, object_count
FROM container_stat''').fetchone()
return self._is_deleted_info(**info)
def get_info_is_deleted(self):
"""
Get the is_deleted status and info for the container.
:returns: a tuple, in the form (info, is_deleted) info is a dict as
returned by get_info and is_deleted is a boolean.
"""
if self.db_file != ':memory:' and not os.path.exists(self.db_file):
return {}, True
info = self.get_info()
return info, self._is_deleted_info(**info)
def get_info(self):
"""
Get global data for the container.
:returns: dict with keys: account, container, created_at,
put_timestamp, delete_timestamp, status_changed_at,
object_count, bytes_used, reported_put_timestamp,
reported_delete_timestamp, reported_object_count,
reported_bytes_used, hash, id, x_container_sync_point1,
x_container_sync_point2, and storage_policy_index.
"""
self._commit_puts_stale_ok()
with self.get() as conn:
data = None
trailing_sync = 'x_container_sync_point1, x_container_sync_point2'
trailing_pol = 'storage_policy_index'
errors = set()
while not data:
try:
data = conn.execute(('''
SELECT account, container, created_at, put_timestamp,
delete_timestamp, status_changed_at,
object_count, bytes_used,
reported_put_timestamp, reported_delete_timestamp,
reported_object_count, reported_bytes_used, hash,
id, %s, %s
FROM container_stat
''') % (trailing_sync, trailing_pol)).fetchone()
except sqlite3.OperationalError as err:
err_msg = str(err)
if err_msg in errors:
# only attempt migration once
raise
errors.add(err_msg)
if 'no such column: storage_policy_index' in err_msg:
trailing_pol = '0 AS storage_policy_index'
elif 'no such column: x_container_sync_point' in err_msg:
trailing_sync = '-1 AS x_container_sync_point1, ' \
'-1 AS x_container_sync_point2'
else:
raise
data = dict(data)
# populate instance cache
self._storage_policy_index = data['storage_policy_index']
self.account = data['account']
self.container = data['container']
return data
def set_x_container_sync_points(self, sync_point1, sync_point2):
with self.get() as conn:
try:
self._set_x_container_sync_points(conn, sync_point1,
sync_point2)
except sqlite3.OperationalError as err:
if 'no such column: x_container_sync_point' not in \
str(err):
raise
self._migrate_add_container_sync_points(conn)
self._set_x_container_sync_points(conn, sync_point1,
sync_point2)
conn.commit()
def _set_x_container_sync_points(self, conn, sync_point1, sync_point2):
if sync_point1 is not None and sync_point2 is not None:
conn.execute('''
UPDATE container_stat
SET x_container_sync_point1 = ?,
x_container_sync_point2 = ?
''', (sync_point1, sync_point2))
elif sync_point1 is not None:
conn.execute('''
UPDATE container_stat
SET x_container_sync_point1 = ?
''', (sync_point1,))
elif sync_point2 is not None:
conn.execute('''
UPDATE container_stat
SET x_container_sync_point2 = ?
''', (sync_point2,))
def get_policy_stats(self):
with self.get() as conn:
try:
info = conn.execute('''
SELECT storage_policy_index, object_count, bytes_used
FROM policy_stat
''').fetchall()
except sqlite3.OperationalError as err:
if not any(msg in str(err) for msg in (
"no such column: storage_policy_index",
"no such table: policy_stat")):
raise
info = conn.execute('''
SELECT 0 as storage_policy_index, object_count, bytes_used
FROM container_stat
''').fetchall()
policy_stats = {}
for row in info:
stats = dict(row)
key = stats.pop('storage_policy_index')
policy_stats[key] = stats
return policy_stats
def has_multiple_policies(self):
with self.get() as conn:
try:
curs = conn.execute('''
SELECT count(storage_policy_index)
FROM policy_stat
''').fetchone()
except sqlite3.OperationalError as err:
if 'no such table: policy_stat' not in str(err):
raise
# no policy_stat row
return False
if curs and curs[0] > 1:
return True
# only one policy_stat row
return False
def set_storage_policy_index(self, policy_index, timestamp=None):
"""
Update the container_stat policy_index and status_changed_at.
"""
if timestamp is None:
timestamp = Timestamp(time.time()).internal
def _setit(conn):
conn.execute('''
INSERT OR IGNORE INTO policy_stat (storage_policy_index)
VALUES (?)
''', (policy_index,))
conn.execute('''
UPDATE container_stat
SET storage_policy_index = ?,
status_changed_at = MAX(?, status_changed_at)
WHERE storage_policy_index <> ?
''', (policy_index, timestamp, policy_index))
conn.commit()
with self.get() as conn:
try:
_setit(conn)
except sqlite3.OperationalError as err:
if not any(msg in str(err) for msg in (
"no such column: storage_policy_index",
"no such table: policy_stat")):
raise
self._migrate_add_storage_policy(conn)
_setit(conn)
self._storage_policy_index = policy_index
def reported(self, put_timestamp, delete_timestamp, object_count,
bytes_used):
"""
Update reported stats, available with container's `get_info`.
:param put_timestamp: put_timestamp to update
:param delete_timestamp: delete_timestamp to update
:param object_count: object_count to update
:param bytes_used: bytes_used to update
"""
with self.get() as conn:
conn.execute('''
UPDATE container_stat
SET reported_put_timestamp = ?, reported_delete_timestamp = ?,
reported_object_count = ?, reported_bytes_used = ?
''', (put_timestamp, delete_timestamp, object_count, bytes_used))
conn.commit()
def list_objects_iter(self, limit, marker, end_marker, prefix, delimiter,
path=None, storage_policy_index=0):
"""
Get a list of objects sorted by name starting at marker onward, up
to limit entries. Entries will begin with the prefix and will not
have the delimiter after the prefix.
:param limit: maximum number of entries to get
:param marker: marker query
:param end_marker: end marker query
:param prefix: prefix query
:param delimiter: delimiter for query
:param path: if defined, will set the prefix and delimiter based on
the path
:returns: list of tuples of (name, created_at, size, content_type,
etag)
"""
delim_force_gte = False
(marker, end_marker, prefix, delimiter, path) = utf8encode(
marker, end_marker, prefix, delimiter, path)
self._commit_puts_stale_ok()
if path is not None:
prefix = path
if path:
prefix = path = path.rstrip('/') + '/'
delimiter = '/'
elif delimiter and not prefix:
prefix = ''
orig_marker = marker
with self.get() as conn:
results = []
while len(results) < limit:
query = '''SELECT name, created_at, size, content_type, etag
FROM object WHERE'''
query_args = []
if end_marker:
query += ' name < ? AND'
query_args.append(end_marker)
if delim_force_gte:
query += ' name >= ? AND'
query_args.append(marker)
# Always set back to False
delim_force_gte = False
elif marker and marker >= prefix:
query += ' name > ? AND'
query_args.append(marker)
elif prefix:
query += ' name >= ? AND'
query_args.append(prefix)
if self.get_db_version(conn) < 1:
query += ' +deleted = 0'
else:
query += ' deleted = 0'
orig_tail_query = '''
ORDER BY name LIMIT ?
'''
orig_tail_args = [limit - len(results)]
# storage policy filter
policy_tail_query = '''
AND storage_policy_index = ?
''' + orig_tail_query
policy_tail_args = [storage_policy_index] + orig_tail_args
tail_query, tail_args = \
policy_tail_query, policy_tail_args
try:
curs = conn.execute(query + tail_query,
tuple(query_args + tail_args))
except sqlite3.OperationalError as err:
if 'no such column: storage_policy_index' not in str(err):
raise
tail_query, tail_args = \
orig_tail_query, orig_tail_args
curs = conn.execute(query + tail_query,
tuple(query_args + tail_args))
curs.row_factory = None
if prefix is None:
# A delimiter without a specified prefix is ignored
return [r for r in curs]
if not delimiter:
if not prefix:
# It is possible to have a delimiter but no prefix
# specified. As above, the prefix will be set to the
# empty string, so avoid performing the extra work to
# check against an empty prefix.
return [r for r in curs]
else:
return [r for r in curs if r[0].startswith(prefix)]
# We have a delimiter and a prefix (possibly empty string) to
# handle
rowcount = 0
for row in curs:
rowcount += 1
marker = name = row[0]
if len(results) >= limit or not name.startswith(prefix):
curs.close()
return results
end = name.find(delimiter, len(prefix))
if path is not None:
if name == path:
continue
if end >= 0 and len(name) > end + len(delimiter):
marker = name[:end] + chr(ord(delimiter) + 1)
curs.close()
break
elif end > 0:
marker = name[:end] + chr(ord(delimiter) + 1)
# we want result to be inclusive of delim+1
delim_force_gte = True
dir_name = name[:end + 1]
if dir_name != orig_marker:
results.append([dir_name, '0', 0, None, ''])
curs.close()
break
results.append(row)
if not rowcount:
break
return results
def merge_items(self, item_list, source=None):
"""
Merge items into the object table.
:param item_list: list of dictionaries of {'name', 'created_at',
'size', 'content_type', 'etag', 'deleted',
'storage_policy_index'}
:param source: if defined, update incoming_sync with the source
"""
def _really_merge_items(conn):
max_rowid = -1
for rec in item_list:
rec.setdefault('storage_policy_index', 0) # legacy
query = '''
DELETE FROM object
WHERE name = ? AND (created_at < ?)
AND storage_policy_index = ?
'''
if self.get_db_version(conn) >= 1:
query += ' AND deleted IN (0, 1)'
conn.execute(query, (rec['name'], rec['created_at'],
rec['storage_policy_index']))
query = '''
SELECT 1 FROM object WHERE name = ?
AND storage_policy_index = ?
'''
if self.get_db_version(conn) >= 1:
query += ' AND deleted IN (0, 1)'
if not conn.execute(query, (
rec['name'], rec['storage_policy_index'])).fetchall():
conn.execute('''
INSERT INTO object (name, created_at, size,
content_type, etag, deleted, storage_policy_index)
VALUES (?, ?, ?, ?, ?, ?, ?)
''', ([rec['name'], rec['created_at'], rec['size'],
rec['content_type'], rec['etag'], rec['deleted'],
rec['storage_policy_index']]))
if source:
max_rowid = max(max_rowid, rec['ROWID'])
if source:
try:
conn.execute('''
INSERT INTO incoming_sync (sync_point, remote_id)
VALUES (?, ?)
''', (max_rowid, source))
except sqlite3.IntegrityError:
conn.execute('''
UPDATE incoming_sync SET sync_point=max(?, sync_point)
WHERE remote_id=?
''', (max_rowid, source))
conn.commit()
with self.get() as conn:
try:
return _really_merge_items(conn)
except sqlite3.OperationalError as err:
if 'no such column: storage_policy_index' not in str(err):
raise
self._migrate_add_storage_policy(conn)
return _really_merge_items(conn)
def get_reconciler_sync(self):
with self.get() as conn:
try:
return conn.execute('''
SELECT reconciler_sync_point FROM container_stat
''').fetchone()[0]
except sqlite3.OperationalError as err:
if "no such column: reconciler_sync_point" not in str(err):
raise
return -1
def update_reconciler_sync(self, point):
query = '''
UPDATE container_stat
SET reconciler_sync_point = ?
'''
with self.get() as conn:
try:
conn.execute(query, (point,))
except sqlite3.OperationalError as err:
if "no such column: reconciler_sync_point" not in str(err):
raise
self._migrate_add_storage_policy(conn)
conn.execute(query, (point,))
conn.commit()
def get_misplaced_since(self, start, count):
"""
Get a list of objects which are in a storage policy different
from the container's storage policy.
:param start: last reconciler sync point
:param count: maximum number of entries to get
:returns: list of dicts with keys: name, created_at, size,
content_type, etag, storage_policy_index
"""
qry = '''
SELECT ROWID, name, created_at, size, content_type, etag,
deleted, storage_policy_index
FROM object
WHERE ROWID > ?
AND storage_policy_index != (
SELECT storage_policy_index FROM container_stat LIMIT 1)
ORDER BY ROWID ASC LIMIT ?
'''
self._commit_puts_stale_ok()
with self.get() as conn:
try:
cur = conn.execute(qry, (start, count))
except sqlite3.OperationalError as err:
if "no such column: storage_policy_index" not in str(err):
raise
return []
return list(dict(row) for row in cur.fetchall())
def _migrate_add_container_sync_points(self, conn):
"""
Add the x_container_sync_point columns to the 'container_stat' table.
"""
conn.executescript('''
BEGIN;
ALTER TABLE container_stat
ADD COLUMN x_container_sync_point1 INTEGER DEFAULT -1;
ALTER TABLE container_stat
ADD COLUMN x_container_sync_point2 INTEGER DEFAULT -1;
COMMIT;
''')
def _migrate_add_storage_policy(self, conn):
"""
Migrate the container schema to support tracking objects from
multiple storage policies. If the container_stat table has any
pending migrations, they are applied now before copying into
container_info.
* create the 'policy_stat' table.
* copy the current 'object_count' and 'bytes_used' columns to a
row in the 'policy_stat' table.
* add the storage_policy_index column to the 'object' table.
* drop the 'object_insert' and 'object_delete' triggers.
* add the 'object_insert_policy_stat' and
'object_delete_policy_stat' triggers.
* create container_info table for non-policy container info
* insert values from container_stat into container_info
* drop container_stat table
* create container_stat view
"""
# I tried just getting the list of column names in the current
# container_stat table with a pragma table_info, but could never get
# it inside the same transaction as the DDL (non-DML) statements:
# https://docs.python.org/2/library/sqlite3.html
# #controlling-transactions
# So we just apply all pending migrations to container_stat and copy a
# static known list of column names into container_info.
try:
self._migrate_add_container_sync_points(conn)
except sqlite3.OperationalError as e:
if 'duplicate column' in str(e):
conn.execute('ROLLBACK;')
else:
raise
try:
conn.executescript("""
ALTER TABLE container_stat
ADD COLUMN metadata TEXT DEFAULT '';
""")
except sqlite3.OperationalError as e:
if 'duplicate column' not in str(e):
raise
column_names = ', '.join((
'account', 'container', 'created_at', 'put_timestamp',
'delete_timestamp', 'reported_put_timestamp',
'reported_object_count', 'reported_bytes_used', 'hash', 'id',
'status', 'status_changed_at', 'metadata',
'x_container_sync_point1', 'x_container_sync_point2'))
conn.executescript(
'BEGIN;' +
POLICY_STAT_TABLE_CREATE +
'''
INSERT INTO policy_stat (
storage_policy_index, object_count, bytes_used)
SELECT 0, object_count, bytes_used
FROM container_stat;
ALTER TABLE object
ADD COLUMN storage_policy_index INTEGER DEFAULT 0;
DROP TRIGGER object_insert;
DROP TRIGGER object_delete;
''' +
POLICY_STAT_TRIGGER_SCRIPT +
CONTAINER_INFO_TABLE_SCRIPT +
'''
INSERT INTO container_info (%s)
SELECT %s FROM container_stat;
DROP TABLE IF EXISTS container_stat;
''' % (column_names, column_names) +
CONTAINER_STAT_VIEW_SCRIPT +
'COMMIT;')