Use Oslo Config global CONF object rather than Flask config.
Update anything using configuration items to use the global CONF object from Oslo Config rather than using the Flask config as the Oslo Config CONF object can be used outside of the Flask application context. Change-Id: Ie7eafd322ab1a6f57ff1e8e5b66e27079caa4aba
This commit is contained in:
parent
e5ae17d85c
commit
2341f2acaf
|
@ -17,6 +17,7 @@ from flask import jsonify
|
|||
from flask import request
|
||||
import magic
|
||||
import os
|
||||
from oslo_config.cfg import CONF
|
||||
from python_nemesis.db.utilities import add_request
|
||||
from python_nemesis.db.utilities import create_or_renew_by_hash
|
||||
from python_nemesis.db.utilities import get_file_by_sha512_hash
|
||||
|
@ -102,7 +103,8 @@ def post_file():
|
|||
file_dict = file.to_dict()
|
||||
|
||||
# Upload to swift and remove the local temp file.
|
||||
upload_to_swift(filename, file_uuid)
|
||||
container = CONF.swift.container.encode('utf-8')
|
||||
upload_to_swift(container, filename, file_uuid)
|
||||
os.remove(filename)
|
||||
|
||||
# Send message to worker queue with file details.
|
||||
|
|
|
@ -91,3 +91,24 @@ def create_or_renew_by_hash(hashes, file_size, file_type=None):
|
|||
file_type,
|
||||
current_user.user_id)
|
||||
return file
|
||||
|
||||
|
||||
def get_file_by_id(file_id):
|
||||
try:
|
||||
result = db.session.query(Files). \
|
||||
filter(Files.file_id == file_id).one()
|
||||
except NoResultFound:
|
||||
result = None
|
||||
|
||||
return result
|
||||
|
||||
|
||||
def update_status_by_file_id(file_id, new_status):
|
||||
update_file = get_file_by_id(file_id)
|
||||
|
||||
if update_file:
|
||||
update_file.status = new_status
|
||||
db.session.commit()
|
||||
return True
|
||||
else:
|
||||
return False
|
||||
|
|
|
@ -14,6 +14,7 @@
|
|||
|
||||
|
||||
class NemesisPlugin(object):
|
||||
plugin_name = 'null_plugin'
|
||||
|
||||
def __init__(self, file_path):
|
||||
self.file_path = file_path
|
||||
|
|
|
@ -12,34 +12,35 @@
|
|||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
from flask import current_app
|
||||
import os
|
||||
from oslo_config.cfg import CONF
|
||||
import swiftclient.client as swiftclient
|
||||
|
||||
|
||||
def upload_to_swift(filename, file_id):
|
||||
config = current_app.config['cfg']
|
||||
auth_version = config.swift.auth_version
|
||||
swift_session = swiftclient.Connection(authurl=config.swift.auth_uri,
|
||||
user=config.swift.user,
|
||||
key=config.swift.password,
|
||||
tenant_name=config.swift.project,
|
||||
def get_swift_session():
|
||||
auth_version = CONF.swift.auth_version
|
||||
swift_session = swiftclient.Connection(authurl=CONF.swift.auth_uri,
|
||||
user=CONF.swift.user,
|
||||
key=CONF.swift.password,
|
||||
tenant_name=CONF.swift.project,
|
||||
auth_version=auth_version)
|
||||
return swift_session
|
||||
|
||||
|
||||
def upload_to_swift(container, filename, file_id):
|
||||
swift_session = get_swift_session()
|
||||
with open(os.path.join(filename), 'rb') as upload_file:
|
||||
container = config.swift.container.encode('utf-8')
|
||||
file_id = str(file_id).encode('utf-8')
|
||||
swift_session.put_object(container, file_id, upload_file)
|
||||
|
||||
|
||||
def download_from_swift(file_uuid):
|
||||
config = current_app.config['cfg']
|
||||
auth_version = config.swift.auth_version
|
||||
swift_session = swiftclient.Connection(authurl=config.swift.auth_uri,
|
||||
user=config.swift.user,
|
||||
key=config.swift.password,
|
||||
tenant_name=config.swift.project,
|
||||
auth_version=auth_version)
|
||||
def download_from_swift(container, file_uuid):
|
||||
swift_session = get_swift_session()
|
||||
obj = swift_session.get_object(container, file_uuid)
|
||||
with open('/tmp/%s' % file_uuid, 'wb') as download_file:
|
||||
download_file.write(obj[1])
|
||||
|
||||
container = config.swift.container.encode('utf-8')
|
||||
print(swift_session.get_object(container, file_uuid))
|
||||
|
||||
def delete_from_swift(container, file_uuid):
|
||||
swift_session = get_swift_session()
|
||||
swift_session.delete_object(container, file_uuid)
|
||||
|
|
|
@ -12,10 +12,13 @@
|
|||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
from flask import current_app
|
||||
from oslo_config.cfg import CONF
|
||||
import oslo_messaging
|
||||
from python_nemesis.db.utilities import update_status_by_file_id
|
||||
from python_nemesis.extensions import log
|
||||
from python_nemesis.swift import delete_from_swift
|
||||
from python_nemesis.swift import download_from_swift
|
||||
from python_nemesis.worker_app import create_worker_app
|
||||
|
||||
|
||||
class NewFileEndpoint(object):
|
||||
|
@ -23,18 +26,30 @@ class NewFileEndpoint(object):
|
|||
event_type='nemsis.new_file')
|
||||
|
||||
def info(self, ctxt, publisher_id, event_type, payload, metadata):
|
||||
file_uuid = payload['file_uuid']
|
||||
file_id = payload['file_id']
|
||||
log.logger.info("Fetched file_id %s to work on from the queue."
|
||||
% file_id)
|
||||
with create_worker_app().app_context():
|
||||
file_uuid = payload['file_uuid']
|
||||
file_id = payload['file_id']
|
||||
|
||||
log.logger.info("Downloading file from Swift for analysis.")
|
||||
download_from_swift(file_uuid)
|
||||
container = CONF.swift.container.encode('utf-8')
|
||||
log.logger.info("Fetched file_id %s to work on from the queue."
|
||||
% file_id)
|
||||
|
||||
log.logger.info("Downloading file from Swift for analysis.")
|
||||
download_from_swift(container, file_uuid)
|
||||
log.logger.info("Fetched file to /tmp/%s" % file_uuid)
|
||||
|
||||
log.logger.info("Running analysis plugins.")
|
||||
log.logger.info("Updating file analysis ")
|
||||
|
||||
log.logger.info("Cleaning up analysis subject.")
|
||||
delete_from_swift('incoming_files', file_uuid)
|
||||
|
||||
log.logger.info("Setting file status to complete.")
|
||||
update_status_by_file_id(file_id, 'complete')
|
||||
|
||||
|
||||
def run_worker():
|
||||
cfg = current_app.config['cfg']
|
||||
transport = oslo_messaging.get_notification_transport(cfg)
|
||||
transport = oslo_messaging.get_notification_transport(CONF)
|
||||
|
||||
targets = [
|
||||
oslo_messaging.Target(topic='nemesis_notifications')
|
||||
|
@ -47,6 +62,7 @@ def run_worker():
|
|||
server = oslo_messaging.get_notification_listener(transport,
|
||||
targets,
|
||||
endpoints,
|
||||
executor='threading',
|
||||
pool=pool)
|
||||
|
||||
server.start()
|
||||
|
|
|
@ -13,7 +13,6 @@
|
|||
from python_nemesis.base_app import configure_app
|
||||
from python_nemesis.base_app import configure_extensions
|
||||
from python_nemesis.base_app import create_app
|
||||
from python_nemesis.worker import run_worker
|
||||
|
||||
|
||||
def create_worker_app():
|
||||
|
@ -24,6 +23,7 @@ def create_worker_app():
|
|||
|
||||
|
||||
if __name__ == "__main__": # pragma: no cover
|
||||
from python_nemesis.worker import run_worker
|
||||
app = create_worker_app()
|
||||
with app.app_context():
|
||||
run_worker()
|
||||
|
|
Loading…
Reference in New Issue