Move sqlite code in common module

Moved sqlite connection code in new module so that we can use
the same in migration phase. Once sqlite driver is removed then
we can remove this common module as well.

Related blueprint centralized-cache-db

Change-Id: Id34f9ae7a2639023c8b6a7937487e7af5c2dccad
This commit is contained in:
Abhishek Kekane 2023-11-02 08:07:05 +00:00
parent 9c7820740a
commit 21333c1ac8
2 changed files with 103 additions and 72 deletions

View File

@ -0,0 +1,90 @@
# Copyright 2023 OpenStack Foundation
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
Common code which will be used in SQLite and centralzed_db driver until SQLite
driver is removed from glance.
"""
from contextlib import contextmanager
import sqlite3
from eventlet import sleep
from eventlet import timeout
from oslo_log import log as logging
from glance.i18n import _LE
LOG = logging.getLogger(__name__)
DEFAULT_SQL_CALL_TIMEOUT = 2
def dict_factory(cur, row):
return {col[0]: row[idx] for idx, col in enumerate(cur.description)}
class SqliteConnection(sqlite3.Connection):
"""
SQLite DB Connection handler that plays well with eventlet,
slightly modified from Swift's similar code.
"""
def __init__(self, *args, **kwargs):
self.timeout_seconds = kwargs.get('timeout', DEFAULT_SQL_CALL_TIMEOUT)
kwargs['timeout'] = 0
sqlite3.Connection.__init__(self, *args, **kwargs)
def _timeout(self, call):
with timeout.Timeout(self.timeout_seconds):
while True:
try:
return call()
except sqlite3.OperationalError as e:
if 'locked' not in str(e):
raise
sleep(0.05)
def execute(self, *args, **kwargs):
return self._timeout(lambda: sqlite3.Connection.execute(
self, *args, **kwargs))
def commit(self):
return self._timeout(lambda: sqlite3.Connection.commit(self))
@contextmanager
def get_db(db_path):
"""
Returns a context manager that produces a database connection that
self-closes and calls rollback if an error occurs while using the
database connection
"""
conn = sqlite3.connect(db_path, check_same_thread=False,
factory=SqliteConnection)
conn.row_factory = sqlite3.Row
conn.text_factory = str
conn.execute('PRAGMA synchronous = NORMAL')
conn.execute('PRAGMA count_changes = OFF')
conn.execute('PRAGMA temp_store = MEMORY')
try:
yield conn
except sqlite3.DatabaseError as e:
msg = _LE("Error executing SQLite call. Got error: %s") % e
LOG.error(msg)
conn.rollback()
finally:
conn.close()

View File

@ -22,8 +22,6 @@ import sqlite3
import stat
import time
from eventlet import sleep
from eventlet import timeout
from oslo_concurrency import lockutils
from oslo_config import cfg
from oslo_log import log as logging
@ -31,8 +29,10 @@ from oslo_utils import excutils
from oslo_utils import fileutils
from glance.common import exception
from glance.i18n import _, _LE, _LI, _LW
from glance.i18n import _, _LI, _LW
from glance.image_cache.drivers import base
from glance.image_cache.drivers import common
LOG = logging.getLogger(__name__)
@ -69,42 +69,6 @@ Related options:
CONF = cfg.CONF
CONF.register_opts(sqlite_opts)
DEFAULT_SQL_CALL_TIMEOUT = 2
class SqliteConnection(sqlite3.Connection):
"""
SQLite DB Connection handler that plays well with eventlet,
slightly modified from Swift's similar code.
"""
def __init__(self, *args, **kwargs):
self.timeout_seconds = kwargs.get('timeout', DEFAULT_SQL_CALL_TIMEOUT)
kwargs['timeout'] = 0
sqlite3.Connection.__init__(self, *args, **kwargs)
def _timeout(self, call):
with timeout.Timeout(self.timeout_seconds):
while True:
try:
return call()
except sqlite3.OperationalError as e:
if 'locked' not in str(e):
raise
sleep(0.05)
def execute(self, *args, **kwargs):
return self._timeout(lambda: sqlite3.Connection.execute(
self, *args, **kwargs))
def commit(self):
return self._timeout(lambda: sqlite3.Connection.commit(self))
def dict_factory(cur, row):
return {col[0]: row[idx] for idx, col in enumerate(cur.description)}
class Driver(base.Driver):
@ -135,7 +99,7 @@ class Driver(base.Driver):
def create_db():
try:
conn = sqlite3.connect(self.db_path, check_same_thread=False,
factory=SqliteConnection)
factory=common.SqliteConnection)
conn.executescript("""
CREATE TABLE IF NOT EXISTS cached_images (
image_id TEXT PRIMARY KEY,
@ -178,7 +142,7 @@ class Driver(base.Driver):
return 0
hits = 0
with self.get_db() as db:
with common.get_db(self.db_path) as db:
cur = db.execute("""SELECT hits FROM cached_images
WHERE image_id = ?""",
(image_id,))
@ -190,12 +154,12 @@ class Driver(base.Driver):
Returns a list of records about cached images.
"""
LOG.debug("Gathering cached image entries.")
with self.get_db() as db:
with common.get_db(self.db_path) as db:
cur = db.execute("""SELECT
image_id, hits, last_accessed, last_modified, size
FROM cached_images
ORDER BY image_id""")
cur.row_factory = dict_factory
cur.row_factory = common.dict_factory
return [r for r in cur]
def is_cached(self, image_id):
@ -242,7 +206,7 @@ class Driver(base.Driver):
Removes all cached image files and any attributes about the images
"""
deleted = 0
with self.get_db() as db:
with common.get_db(self.db_path) as db:
for path in self.get_cache_files(self.base_dir):
delete_cached_file(path)
deleted += 1
@ -257,7 +221,7 @@ class Driver(base.Driver):
:param image_id: Image ID
"""
path = self.get_image_filepath(image_id)
with self.get_db() as db:
with common.get_db(self.db_path) as db:
delete_cached_file(path)
db.execute("""DELETE FROM cached_images WHERE image_id = ?""",
(image_id, ))
@ -301,7 +265,7 @@ class Driver(base.Driver):
Return a tuple containing the image_id and size of the least recently
accessed cached file, or None if no cached files.
"""
with self.get_db() as db:
with common.get_db(self.db_path) as db:
cur = db.execute("""SELECT image_id FROM cached_images
ORDER BY last_accessed LIMIT 1""")
try:
@ -329,7 +293,7 @@ class Driver(base.Driver):
incomplete_path = self.get_image_filepath(image_id, 'incomplete')
def commit():
with self.get_db() as db:
with common.get_db(self.db_path) as db:
final_path = self.get_image_filepath(image_id)
LOG.debug("Fetch finished, moving "
"'%(incomplete_path)s' to '%(final_path)s'",
@ -352,7 +316,7 @@ class Driver(base.Driver):
db.commit()
def rollback(e):
with self.get_db() as db:
with common.get_db(self.db_path) as db:
if os.path.exists(incomplete_path):
invalid_path = self.get_image_filepath(image_id, 'invalid')
@ -398,36 +362,13 @@ class Driver(base.Driver):
with open(path, 'rb') as cache_file:
yield cache_file
now = time.time()
with self.get_db() as db:
with common.get_db(self.db_path) as db:
db.execute("""UPDATE cached_images
SET hits = hits + 1, last_accessed = ?
WHERE image_id = ?""",
(now, image_id))
db.commit()
@contextmanager
def get_db(self):
"""
Returns a context manager that produces a database connection that
self-closes and calls rollback if an error occurs while using the
database connection
"""
conn = sqlite3.connect(self.db_path, check_same_thread=False,
factory=SqliteConnection)
conn.row_factory = sqlite3.Row
conn.text_factory = str
conn.execute('PRAGMA synchronous = NORMAL')
conn.execute('PRAGMA count_changes = OFF')
conn.execute('PRAGMA temp_store = MEMORY')
try:
yield conn
except sqlite3.DatabaseError as e:
msg = _LE("Error executing SQLite call. Got error: %s") % e
LOG.error(msg)
conn.rollback()
finally:
conn.close()
def queue_image(self, image_id):
"""
This adds a image to be cache to the queue.