congress/congress/datasources/json_ingester/json_ingester.py

463 lines
19 KiB
Python

# Copyright (c) 2018, 2019 VMware, Inc. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
from __future__ import print_function
from __future__ import division
from __future__ import absolute_import
import datetime
import json
import sys
from jsonpath_rw import parser
from oslo_config import cfg
from oslo_log import log as logging
import psycopg2
from psycopg2 import sql
import requests
from congress.api import base as api_base
from congress.datasources import datasource_driver
from congress.datasources import datasource_utils
from congress.dse2 import data_service
from congress import exception
LOG = logging.getLogger(__name__)
class JsonIngester(datasource_driver.PollingDataSourceDriver):
def __init__(self, name, config, exec_manager):
def validate_config(config):
# FIXME: use json schema to validate config
config_tables = config['tables']
poll_tables = [table for table in config_tables
if 'poll' in config_tables[table]]
if len(poll_tables) > 0:
# FIXME: when polling table exists, require configs:
# api_endpoint, authentication
pass
for table_name in config_tables:
if ('poll' in config_tables[table_name]
and 'webhook' in config_tables[table_name]):
raise exception.BadConfig(
'Table ({}) cannot be configured for '
'both poll and webhook.'.format(table_name))
# use prefix to avoid service_id clash with regular data sources
super(JsonIngester, self).__init__(
api_base.JSON_DS_SERVICE_PREFIX + name)
self.exec_manager = exec_manager # ref to global mgr for api exec
self.type = 'json_ingester'
self.name = name # set name back to one without prefix for use here
if 'tables' not in config:
# config w/o table used to define exec_api endpoint
# in this case, no need to create datasource service
return
validate_config(config)
self._config = config
self._create_schema_and_tables()
self.poll_time = self._config.get('poll_interval', 60)
self._setup_table_key_sets()
self._api_endpoint = self._config.get('api_endpoint')
self._initialize_session()
self._initialize_update_methods()
if len(self.update_methods) > 0:
self._init_end_start_poll()
else:
self.initialized = True
# For DSE2. Must go after __init__
if hasattr(self, 'add_rpc_endpoint'):
self.add_rpc_endpoint(JsonIngesterEndpoints(self))
def _setup_table_key_sets(self):
# because postgres cannot directly use the jsonb column d as key,
# the _key column is added as key in order to support performant
# delete of specific rows in delta update to the db table
# for each table, maintain in memory an association between the json
# data and a unique key. The association is maintained using the
# KeyMap class
# Note: The key may change from session to session, which does not
# cause a problem in this case because the db tables
# (along with old keys) are cleared each time congress starts
# { table_name -> KeyMap object}
self.key_sets = {}
for table_name in self._config['tables']:
self.key_sets[table_name] = KeyMap()
def _clear_table_state(self, table_name):
del self.state[table_name]
self.key_sets[table_name].clear()
def publish(self, table, data, use_snapshot=False):
LOG.debug('JSON Ingester "%s" publishing table "%s"', self.name, table)
LOG.trace('publish(self=%s, table=%s, data=%s, use_snapshot=%s',
self, table, data, use_snapshot)
return self._update_table(
table, new_data=data,
old_data=self.prior_state.get(table, set([])),
use_snapshot=use_snapshot)
def _create_schema_and_tables(self):
create_schema_statement = """CREATE SCHEMA IF NOT EXISTS {};"""
create_table_statement = """
CREATE TABLE IF NOT EXISTS {}.{}
(d jsonb, _key text, primary key (_key));"""
# Note: because postgres cannot directly use the jsonb column d as key,
# the _key column is added as key in order to support performant
# delete of specific rows in delta update to the db table
create_index_statement = """
CREATE INDEX IF NOT EXISTS __{table}_d_gin_idx on {schema}.{table}
USING GIN (d);"""
drop_index_statement = """
DROP INDEX IF EXISTS __{schema}.{table}_d_gin_idx;"""
conn = None
try:
conn = psycopg2.connect(cfg.CONF.json_ingester.db_connection)
conn.set_session(
isolation_level=psycopg2.extensions.
ISOLATION_LEVEL_READ_COMMITTED,
readonly=False, autocommit=False)
cur = conn.cursor()
# create schema
cur.execute(
sql.SQL(create_schema_statement).format(
sql.Identifier(self.name)))
for table_name in self._config['tables']:
# create table
cur.execute(sql.SQL(create_table_statement).format(
sql.Identifier(self.name), sql.Identifier(table_name)))
if self._config['tables'][table_name].get('gin_index', True):
cur.execute(sql.SQL(create_index_statement).format(
schema=sql.Identifier(self.name),
table=sql.Identifier(table_name)))
else:
cur.execute(sql.SQL(drop_index_statement).format(
schema=sql.Identifier(self.name),
table=sql.Identifier(table_name)))
conn.commit()
cur.close()
except (Exception, psycopg2.Error):
if 'table_name' in locals():
LOG.exception("Error creating table %s in schema %s",
table_name, self.name)
else:
LOG.exception("Error creating schema %s", self.name)
raise
finally:
if conn is not None:
conn.close()
def _update_table(
self, table_name, new_data, old_data, use_snapshot):
# return False immediately if no change to update
if new_data == old_data:
return False
insert_statement = """INSERT INTO {}.{}
VALUES(%s, %s);"""
delete_all_statement = """DELETE FROM {}.{};"""
delete_tuple_statement = """
DELETE FROM {}.{} WHERE _key = %s;"""
conn = None
try:
conn = psycopg2.connect(cfg.CONF.json_ingester.db_connection)
conn.set_session(
isolation_level=psycopg2.extensions.
ISOLATION_LEVEL_READ_COMMITTED,
readonly=False, autocommit=False)
cur = conn.cursor()
if use_snapshot:
to_insert = new_data
# delete all existing data from table
cur.execute(sql.SQL(delete_all_statement).format(
sql.Identifier(self.name), sql.Identifier(table_name)))
self.key_sets[table_name].clear()
else:
to_insert = new_data - old_data
to_delete = old_data - new_data
# delete the appropriate rows from table
for d in to_delete:
cur.execute(sql.SQL(delete_tuple_statement).format(
sql.Identifier(self.name),
sql.Identifier(table_name)),
(str(self.key_sets[table_name].remove_and_get_key(d)),)
)
# insert new data into table
for d in to_insert:
cur.execute(sql.SQL(insert_statement).format(
sql.Identifier(self.name),
sql.Identifier(table_name)),
(d, str(self.key_sets[table_name].add_and_get_key(d))))
conn.commit()
cur.close()
return True # return True indicating change made
except (Exception, psycopg2.Error):
LOG.exception("Error writing to DB")
# makes the next update use snapshot
self._clear_table_state(table_name)
return False # return False indicating no change made (rollback)
finally:
if conn is not None:
conn.close()
def add_update_method(self, method, table_name):
if table_name in self.update_methods:
raise exception.Conflict('A method has already registered for '
'the table %s.' %
table_name)
self.update_methods[table_name] = method
def _initialize_session(self):
auth_config = self._config.get('authentication')
if auth_config is None:
self._session = requests.Session()
self._session.headers.update(
self._config.get('api_default_headers', {}))
else:
if auth_config['type'] == 'keystone':
self._session = datasource_utils.get_keystone_session(
self._config['authentication']['config'],
headers=self._config.get('api_default_headers', {}))
else:
LOG.error('authentication type %s not supported.',
auth_config.get['type'])
raise exception.BadConfig(
'authentication type {} not supported.'.format(
auth_config['type']))
def _initialize_update_methods(self):
for table_name in self._config['tables']:
if 'poll' in self._config['tables'][table_name]:
table_info = self._config['tables'][table_name]['poll']
# Note: using default parameters to get early-binding of
# variables in closure
def update_method(
table_name=table_name, table_info=table_info):
try:
full_path = self._api_endpoint.rstrip(
'/') + '/' + table_info['api_path'].lstrip('/')
result = self._session.get(full_path).json()
# FIXME: generalize to other verbs?
jsonpath_expr = parser.parse(table_info['jsonpath'])
ingest_data = [match.value for match in
jsonpath_expr.find(result)]
self.state[table_name] = set(
[json.dumps(item, sort_keys=True)
for item in ingest_data])
except BaseException:
LOG.exception('Exception occurred while updating '
'table %s.%s from: URL %s',
self.name, table_name,
full_path)
self.add_update_method(update_method, table_name)
def update_from_datasource(self):
for table in self.update_methods:
LOG.debug('update table %s.' % table)
self.update_methods[table]()
# Note(thread-safety): blocking function
def poll(self):
"""Periodically called to update new info.
Function called periodically to grab new information, compute
deltas, and publish those deltas.
"""
LOG.info("%s:: polling", self.name)
self.prior_state = dict(self.state) # copying self.state
self.last_error = None # non-None only when last poll errored
try:
self.update_from_datasource() # sets self.state
# publish those tables with polling update methods
overall_change_made = False
for tablename in self.update_methods:
use_snapshot = tablename not in self.prior_state
# Note(thread-safety): blocking call[
this_table_change_made = self.publish(
tablename, self.state.get(tablename, set([])),
use_snapshot=use_snapshot)
overall_change_made = (overall_change_made
or this_table_change_made)
if overall_change_made:
self.exec_manager.evaluate_and_execute_actions()
except Exception as e:
self.last_error = e
LOG.exception("Datasource driver raised exception")
self.last_updated_time = datetime.datetime.now()
self.number_of_updates += 1
LOG.info("%s:: finished polling", self.name)
def json_ingester_webhook_handler(self, table_name, body):
def get_exactly_one_jsonpath_match(
jsonpath, jsondata, custom_error_msg):
jsonpath_expr = parser.parse(jsonpath)
matches = jsonpath_expr.find(jsondata)
if len(matches) != 1:
raise exception.BadRequest(
custom_error_msg.format(jsonpath, jsondata))
return matches[0].value
try:
webhook_config = self._config['tables'][table_name]['webhook']
except KeyError:
raise exception.NotFound(
'In JSON Ingester: "{}", the table "{}" either does not exist '
'or is not configured for webhook.'.format(
self.name, table_name))
json_record = get_exactly_one_jsonpath_match(
webhook_config['record_jsonpath'], body,
'In identifying JSON record from webhook body, the configured '
'jsonpath expression "{}" fails to obtain exactly one match on '
'webhook body "{}".')
json_id = get_exactly_one_jsonpath_match(
webhook_config['id_jsonpath'], json_record,
'In identifying ID from JSON record, the configured jsonpath '
'expression "{}" fails to obtain exactly one match on JSON record'
' "{}".')
self._webhook_update_table(table_name, key=json_id, data=json_record)
self.exec_manager.evaluate_and_execute_actions()
def _webhook_update_table(self, table_name, key, data):
key_string = json.dumps(key, sort_keys=True)
PGSQL_MAX_INDEXABLE_SIZE = 2712
if len(key_string) > PGSQL_MAX_INDEXABLE_SIZE:
raise exception.BadRequest(
'The supplied key ({}) exceeds the max indexable size ({}) in '
'PostgreSQL.'.format(key_string, PGSQL_MAX_INDEXABLE_SIZE))
insert_statement = """INSERT INTO {}.{}
VALUES(%s, %s);"""
delete_tuple_statement = """
DELETE FROM {}.{} WHERE _key = %s;"""
conn = None
try:
conn = psycopg2.connect(cfg.CONF.json_ingester.db_connection)
conn.set_session(
isolation_level=psycopg2.extensions.
ISOLATION_LEVEL_READ_COMMITTED,
readonly=False, autocommit=False)
cur = conn.cursor()
# delete the appropriate row from table
cur.execute(sql.SQL(delete_tuple_statement).format(
sql.Identifier(self.name),
sql.Identifier(table_name)),
(key_string,))
# insert new row into table
cur.execute(sql.SQL(insert_statement).format(
sql.Identifier(self.name),
sql.Identifier(table_name)),
(json.dumps(data), key_string))
conn.commit()
cur.close()
except (Exception, psycopg2.Error):
LOG.exception("Error writing to DB")
finally:
if conn is not None:
conn.close()
def validate_lazy_tables(self):
'''override non-applicable parent method as no-op'''
pass
def initialize_translators(self):
'''override non-applicable parent method as no-op'''
pass
def get_snapshot(self, table_name):
raise NotImplementedError(
'This method should not be called in PollingJsonIngester.')
def get_row_data(self, table_id, *args, **kwargs):
raise NotImplementedError(
'This method should not be called in PollingJsonIngester.')
def register_translator(self, translator):
raise NotImplementedError(
'This method should not be called in PollingJsonIngester.')
def get_translator(self, translator_name):
raise NotImplementedError(
'This method should not be called in PollingJsonIngester.')
def get_translators(self):
raise NotImplementedError(
'This method should not be called in PollingJsonIngester.')
class JsonIngesterEndpoints(data_service.DataServiceEndPoints):
def __init__(self, service):
super(JsonIngesterEndpoints, self).__init__(service)
# Note (thread-safety): blocking function
def json_ingester_webhook_handler(self, context, table_name, body):
# Note (thread-safety): blocking call
return self.service.json_ingester_webhook_handler(table_name, body)
class KeyMap(object):
'''Map associating a unique integer key with each hashable object'''
_PY_MIN_INT = -sys.maxsize - 1 # minimum primitive integer supported
_PGSQL_MIN_BIGINT = -2**63 # minimum BIGINT supported in postgreSQL
# reference: https://www.postgresql.org/docs/9.4/datatype-numeric.html
def __init__(self):
self._key_mapping = {}
self._reclaimed_free_keys = set([])
self._next_incremental_key = max(
self._PY_MIN_INT, self._PGSQL_MIN_BIGINT) # start from least
def add_and_get_key(self, datum):
'''Add a datum and return associated key'''
if datum in self._key_mapping:
return self._key_mapping[datum]
else:
try:
next_key = self._reclaimed_free_keys.pop()
except KeyError:
next_key = self._next_incremental_key
self._next_incremental_key += 1
self._key_mapping[datum] = next_key
return next_key
def remove_and_get_key(self, datum):
'''Remove a datum and return associated key'''
key = self._key_mapping.pop(datum)
self._reclaimed_free_keys.add(key)
return key
def clear(self):
'''Remove all data and keys'''
self.__init__()
def __len__(self):
return len(self._key_mapping)