Add uber simple Oslo.Messaging notifier for sending messages.

Add uber simple Oslo.Messaging notifier for sending messages.

Change-Id: I5ac4bafbb02cb4dfb79e5892e5b9fc42d9162807
This commit is contained in:
Robert Putt 2017-12-03 17:02:10 +00:00
parent bb16e13ec1
commit 63dc98c723
5 changed files with 69 additions and 29 deletions

View File

@ -1,4 +1,6 @@
[DEFAULT] [DEFAULT]
rpc_backend = rabbit
transport_url = rabbit://<user>:<password>@<host>/<vhost>
[keystone_authtoken] [keystone_authtoken]
identity_uri = identity_uri =

View File

@ -12,22 +12,19 @@
# License for the specific language governing permissions and limitations # License for the specific language governing permissions and limitations
# under the License. # under the License.
import datetime
from flask import Blueprint from flask import Blueprint
from flask import jsonify from flask import jsonify
from flask import request from flask import request
from flask_keystone import current_user
import os import os
from python_nemesis.db.utilities import add_request from python_nemesis.db.utilities import add_request
from python_nemesis.db.utilities import create_new_file from python_nemesis.db.utilities import create_or_renew_by_hash
from python_nemesis.db.utilities import get_file_by_sha512_hash
from python_nemesis.db.utilities import search_by_hash from python_nemesis.db.utilities import search_by_hash
from python_nemesis.exceptions import general_handler from python_nemesis.exceptions import general_handler
from python_nemesis.exceptions import NemesisException from python_nemesis.exceptions import NemesisException
from python_nemesis.exceptions import NotFoundException from python_nemesis.exceptions import NotFoundException
from python_nemesis.extensions import db
from python_nemesis.extensions import log from python_nemesis.extensions import log
from python_nemesis.file_hasher import get_all_hashes from python_nemesis.file_hasher import get_all_hashes
from python_nemesis.notifications import submit_worker_notification
from python_nemesis.swift import upload_to_swift from python_nemesis.swift import upload_to_swift
import uuid import uuid
from werkzeug.utils import secure_filename from werkzeug.utils import secure_filename
@ -69,47 +66,37 @@ def lookup_hash(req_hash):
@V1_API.route('/v1/file', methods=['POST']) @V1_API.route('/v1/file', methods=['POST'])
def post_file(): def post_file():
filename = secure_filename(str(uuid.uuid4())) file_uuid = secure_filename(str(uuid.uuid4()))
filename = '/tmp/%s' % filename filename = '/tmp/%s' % file_uuid
file = request.files['file'] file = request.files['file']
if 'Content-Range' in request.headers: if 'Content-Range' in request.headers:
# extract starting byte from Content-Range header string # Extract starting byte from Content-Range header string.
range_str = request.headers['Content-Range'] range_str = request.headers['Content-Range']
start_bytes = int(range_str.split(' ')[1].split('-')[0]) start_bytes = int(range_str.split(' ')[1].split('-')[0])
# append chunk to the file on disk, or create new # Append chunk to the file on disk, or create new.
with open(filename, 'a') as f: with open(filename, 'a') as f:
f.seek(start_bytes) f.seek(start_bytes)
f.write(file.stream.read()) f.write(file.stream.read())
else: else:
# this is not a chunked request, so just save the whole file # This is not a chunked request, so just save the whole file.
file.save(filename) file.save(filename)
# Generate hash of file, and create new, or renew existing db row.
file_hashes = get_all_hashes(filename) file_hashes = get_all_hashes(filename)
current_file = get_file_by_sha512_hash(file_hashes['sha512'])
file_size = os.path.getsize(filename) file_size = os.path.getsize(filename)
file = create_or_renew_by_hash(file_hashes, file_size)
if current_file: file_id = file.file_id
current_file.last_updated = datetime.datetime.now() file_dict = file.to_dict()
current_file.status = 'analysing'
db.session.commit()
file_id = current_file.file_id
file_dict = current_file.to_dict()
else:
file = create_new_file(file_hashes['md5'],
file_hashes['sha1'],
file_hashes['sha256'],
file_hashes['sha512'],
file_size,
current_user.user_id)
file_id = file.file_id
file_dict = file.to_dict()
# Upload to swift and remove the local temp file. # Upload to swift and remove the local temp file.
upload_to_swift(filename, file_id) upload_to_swift(filename, file_uuid)
os.remove(filename) os.remove(filename)
# Send message to worker queue with file details.
worker_msg = {"file_uuid": file_uuid, "file_id": file_id}
submit_worker_notification(worker_msg)
return jsonify(file_dict) return jsonify(file_dict)

View File

@ -13,6 +13,7 @@
from flask import Flask from flask import Flask
import os import os
from oslo_config import cfg from oslo_config import cfg
import oslo_messaging
from python_nemesis.config import collect_sqlalchemy_opts from python_nemesis.config import collect_sqlalchemy_opts
from python_nemesis.config import register_opts from python_nemesis.config import register_opts
from python_nemesis.extensions import db from python_nemesis.extensions import db
@ -68,6 +69,15 @@ def configure_extensions(app):
keystone.init_app(app) keystone.init_app(app)
def configure_notifier(app):
transport = oslo_messaging.get_notification_transport(app.config['cfg'])
topics = ['nemesis_notifications']
app.config["notifier"] = oslo_messaging.Notifier(transport,
'nemesis.api',
driver='messagingv2',
topics=topics)
def create_app(app_name=None, blueprints=None): def create_app(app_name=None, blueprints=None):
"""Create the flask app. """Create the flask app.
@ -84,6 +94,7 @@ def create_app(app_name=None, blueprints=None):
configure_app(app) configure_app(app)
configure_extensions(app) configure_extensions(app)
configure_notifier(app)
# Here we register the application blueprints. # Here we register the application blueprints.
from python_nemesis.api.v1 import V1_API from python_nemesis.api.v1 import V1_API

View File

@ -70,3 +70,22 @@ def create_new_file(md5_hash, sha1_hash, sha256_hash, sha512_hash,
db.session.add(file) db.session.add(file)
db.session.commit() db.session.commit()
return file return file
def create_or_renew_by_hash(hashes, file_size):
current_file = get_file_by_sha512_hash(hashes['sha512'])
if current_file:
current_file.last_updated = datetime.datetime.now()
current_file.status = 'analysing'
db.session.commit()
return current_file
else:
file = create_new_file(hashes['md5'],
hashes['sha1'],
hashes['sha256'],
hashes['sha512'],
file_size,
current_user.user_id)
return file

View File

@ -0,0 +1,21 @@
# -*- coding: utf-8 -*-
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from flask import current_app
def submit_worker_notification(msg):
notifier = current_app.config['notifier']
notifier.info({}, 'nemsis.new_file', msg)