Fix redundant index, unindexable, wrong isolation in JSON ingester
Fixes a bug where json ingester repeatedly creates redundant indexes each time Congress restarts. Mitigates a problem where indexing fails when a field exceeds the postgres indexable limit of 2712 bytes. Indexing is made optional. Fixes a bug where unexpected behavior occurs when the backend DB default isolation level is not the expected read_committed. The isolation level is now set explicitly per connection. Change-Id: I514aa9b96e4efbffe8880cce775dc2259eca4648 Closes-bug: 1819987 Closes-bug: 1819988 Closes-bug: 1819985
This commit is contained in:
parent
e84057a6f5
commit
a1dfbdec00
|
@ -134,11 +134,18 @@ class JsonIngester(datasource_driver.PollingDataSourceDriver):
|
||||||
# the _key column is added as key in order to support performant
|
# the _key column is added as key in order to support performant
|
||||||
# delete of specific rows in delta update to the db table
|
# delete of specific rows in delta update to the db table
|
||||||
|
|
||||||
create_index_statement = \
|
create_index_statement = """
|
||||||
"""CREATE INDEX on {}.{} USING GIN (d);"""
|
CREATE INDEX IF NOT EXISTS __{table}_d_gin_idx on {schema}.{table}
|
||||||
|
USING GIN (d);"""
|
||||||
|
drop_index_statement = """
|
||||||
|
DROP INDEX IF EXISTS __{schema}.{table}_d_gin_idx;"""
|
||||||
conn = None
|
conn = None
|
||||||
try:
|
try:
|
||||||
conn = psycopg2.connect(cfg.CONF.json_ingester.db_connection)
|
conn = psycopg2.connect(cfg.CONF.json_ingester.db_connection)
|
||||||
|
conn.set_session(
|
||||||
|
isolation_level=psycopg2.extensions.
|
||||||
|
ISOLATION_LEVEL_READ_COMMITTED,
|
||||||
|
readonly=False, autocommit=False)
|
||||||
cur = conn.cursor()
|
cur = conn.cursor()
|
||||||
# create schema
|
# create schema
|
||||||
cur.execute(
|
cur.execute(
|
||||||
|
@ -148,9 +155,14 @@ class JsonIngester(datasource_driver.PollingDataSourceDriver):
|
||||||
# create table
|
# create table
|
||||||
cur.execute(sql.SQL(create_table_statement).format(
|
cur.execute(sql.SQL(create_table_statement).format(
|
||||||
sql.Identifier(self.name), sql.Identifier(table_name)))
|
sql.Identifier(self.name), sql.Identifier(table_name)))
|
||||||
# TODO(ekcs): make index creation optional
|
if self._config['tables'][table_name].get('gin_index', True):
|
||||||
cur.execute(sql.SQL(create_index_statement).format(
|
cur.execute(sql.SQL(create_index_statement).format(
|
||||||
sql.Identifier(self.name), sql.Identifier(table_name)))
|
schema=sql.Identifier(self.name),
|
||||||
|
table=sql.Identifier(table_name)))
|
||||||
|
else:
|
||||||
|
cur.execute(sql.SQL(drop_index_statement).format(
|
||||||
|
schema=sql.Identifier(self.name),
|
||||||
|
table=sql.Identifier(table_name)))
|
||||||
conn.commit()
|
conn.commit()
|
||||||
cur.close()
|
cur.close()
|
||||||
except (Exception, psycopg2.Error):
|
except (Exception, psycopg2.Error):
|
||||||
|
@ -179,6 +191,10 @@ class JsonIngester(datasource_driver.PollingDataSourceDriver):
|
||||||
conn = None
|
conn = None
|
||||||
try:
|
try:
|
||||||
conn = psycopg2.connect(cfg.CONF.json_ingester.db_connection)
|
conn = psycopg2.connect(cfg.CONF.json_ingester.db_connection)
|
||||||
|
conn.set_session(
|
||||||
|
isolation_level=psycopg2.extensions.
|
||||||
|
ISOLATION_LEVEL_READ_COMMITTED,
|
||||||
|
readonly=False, autocommit=False)
|
||||||
cur = conn.cursor()
|
cur = conn.cursor()
|
||||||
if use_snapshot:
|
if use_snapshot:
|
||||||
to_insert = new_data
|
to_insert = new_data
|
||||||
|
@ -345,6 +361,10 @@ class JsonIngester(datasource_driver.PollingDataSourceDriver):
|
||||||
conn = None
|
conn = None
|
||||||
try:
|
try:
|
||||||
conn = psycopg2.connect(cfg.CONF.json_ingester.db_connection)
|
conn = psycopg2.connect(cfg.CONF.json_ingester.db_connection)
|
||||||
|
conn.set_session(
|
||||||
|
isolation_level=psycopg2.extensions.
|
||||||
|
ISOLATION_LEVEL_READ_COMMITTED,
|
||||||
|
readonly=False, autocommit=False)
|
||||||
cur = conn.cursor()
|
cur = conn.cursor()
|
||||||
# delete the appropriate row from table
|
# delete the appropriate row from table
|
||||||
cur.execute(sql.SQL(delete_tuple_statement).format(
|
cur.execute(sql.SQL(delete_tuple_statement).format(
|
||||||
|
|
|
@ -5,6 +5,7 @@ api_endpoint_host: https://cve.circl.lu
|
||||||
api_endpoint_path: api/
|
api_endpoint_path: api/
|
||||||
tables:
|
tables:
|
||||||
linux_kernel:
|
linux_kernel:
|
||||||
|
gin_index: false
|
||||||
poll:
|
poll:
|
||||||
api_path: search/linux/kernel
|
api_path: search/linux/kernel
|
||||||
api_method: get
|
api_method: get
|
||||||
|
|
Loading…
Reference in New Issue