diff --git a/muranoagent/app.py b/muranoagent/app.py index 7b3f901a..5852d905 100644 --- a/muranoagent/app.py +++ b/muranoagent/app.py @@ -20,21 +20,17 @@ import time from oslo_log import log as logging from oslo_service import service from oslo_utils import strutils -import semantic_version -import six -from muranoagent import bunch from muranoagent.common import config from muranoagent.common import messaging -from muranoagent import exceptions as exc from muranoagent import execution_plan_queue from muranoagent import execution_plan_runner from muranoagent import execution_result as ex_result +from muranoagent import validation CONF = config.CONF LOG = logging.getLogger(__name__) -max_format_version = semantic_version.Spec('<=2.2.0') class MuranoAgent(service.Service): @@ -80,11 +76,29 @@ class MuranoAgent(service.Service): if plan is not None: LOG.debug("Got an execution plan '{0}':".format( strutils.mask_password(str(plan)))) - self._run(plan) + if self._verify_plan(plan): + self._run(plan) return next(msg_iterator) + def _verify_plan(self, plan): + try: + validation.validate_plan(plan) + return True + except Exception as err: + try: + execution_result = ex_result.ExecutionResult.from_error( + err, plan) + if 'ReplyTo' in plan and CONF.enable_dynamic_result_queue: + execution_result['ReplyTo'] = plan.ReplyTo + + self._send_result(execution_result) + except ValueError: + LOG.warning('Execution result is not produced') + finally: + return False + def _run(self, plan): try: with execution_plan_runner.ExecutionPlanRunner(plan) as runner: @@ -135,12 +149,18 @@ class MuranoAgent(service.Service): prefetch_count=1) as subscription: while True: msg = subscription.get_message(timeout=5) - if msg is not None and isinstance(msg.body, dict): - self._handle_message(msg) + if msg is not None: + try: + self._queue.put_execution_plan( + msg.body, + msg.signature, + msg.id, + msg.reply_to) + finally: + msg.ack() delay = 5 if msg is not None: - msg.ack() yield except KeyboardInterrupt: break @@ -148,138 +168,3 @@ class MuranoAgent(service.Service): LOG.warning('Communication error', exc_info=True) time.sleep(delay) delay = min(delay * 1.2, 60) - - def _handle_message(self, msg): - if 'ID' not in msg.body and msg.id: - msg.body['ID'] = msg.id - if 'ReplyTo' not in msg.body and msg.reply_to: - msg.body['ReplyTo'] = msg.reply_to - try: - self._verify_plan(msg.body) - self._queue.put_execution_plan(msg.body) - except Exception as err: - try: - execution_result = ex_result.ExecutionResult.from_error( - err, bunch.Bunch(msg.body)) - if ('ReplyTo' in msg.body) and \ - CONF.enable_dynamic_result_queue: - execution_result['ReplyTo'] = msg.body.get('ReplyTo') - - self._send_result(execution_result) - except ValueError: - LOG.warning('Execution result is not produced') - - def _verify_plan(self, plan): - plan_format_version = semantic_version.Version( - plan.get('FormatVersion', '1.0.0')) - - if plan_format_version not in max_format_version: - # NOTE(kazitsev) this is Version in Spec not str in str - raise exc.IncorrectFormat( - 9, - "Unsupported format version {0} " - "(I support versions {1})".format( - plan_format_version, max_format_version)) - - for attr in ('Scripts', 'Files'): - if attr not in plan: - raise exc.IncorrectFormat( - 2, '{0} is not in the execution plan'.format(attr)) - - for attr in ('Scripts', 'Files', 'Options'): - if attr in plan and not isinstance( - plan[attr], dict): - raise exc.IncorrectFormat( - 2, '{0} is not a dictionary'.format(attr)) - - for name, script in plan.get('Scripts', {}).items(): - self._validate_script(name, script, plan_format_version, plan) - - for key, plan_file in plan.get('Files', {}).items(): - self._validate_file(plan_file, key, plan_format_version) - - def _validate_script(self, name, script, plan_format_version, plan): - for attr in ('Type', 'EntryPoint'): - if attr not in script or not isinstance(script[attr], - six.string_types): - raise exc.IncorrectFormat( - 2, 'Incorrect {0} entry in script {1}'.format( - attr, name)) - - if plan_format_version in semantic_version.Spec('>=2.0.0,<2.1.0'): - if script['Type'] != 'Application': - raise exc.IncorrectFormat( - 2, 'Type {0} is not valid for format {1}'.format( - script['Type'], plan_format_version)) - if script['EntryPoint'] not in plan.get('Files', {}): - raise exc.IncorrectFormat( - 2, 'Script {0} misses entry point {1}'.format( - name, script['EntryPoint'])) - - if plan_format_version in semantic_version.Spec('>=2.1.0'): - if script['Type'] not in ('Application', 'Chef', 'Puppet'): - raise exc.IncorrectFormat( - 2, 'Script has not a valid type {0}'.format( - script['Type'])) - if (script['Type'] == 'Application' and script['EntryPoint'] - not in plan.get('Files', {})): - raise exc.IncorrectFormat( - 2, 'Script {0} misses entry point {1}'.format( - name, script['EntryPoint'])) - elif (script['Type'] != 'Application' and - "::" not in script['EntryPoint']): - raise exc.IncorrectFormat( - 2, 'Wrong EntryPoint {0} for Puppet/Chef ' - 'executors. :: needed'.format(script['EntryPoint'])) - - for option in script['Options']: - if option in ('useBerkshelf', 'berksfilePath'): - if plan_format_version in semantic_version.Spec('<2.2.0'): - raise exc.IncorrectFormat( - 2, 'Script has an option {0} invalid ' - 'for version {1}'.format(option, - plan_format_version)) - elif script['Type'] != 'Chef': - raise exc.IncorrectFormat( - 2, 'Script has an option {0} invalid ' - 'for type {1}'.format(option, script['Type'])) - - for additional_file in script.get('Files', []): - mns_error = ('Script {0} misses file {1}'. - format(name, additional_file)) - if isinstance(additional_file, dict): - if (list(additional_file.keys())[0] not in - plan.get('Files', {}).keys()): - raise exc.IncorrectFormat(2, mns_error) - elif additional_file not in plan.get('Files', {}): - raise exc.IncorrectFormat(2, mns_error) - - def _validate_file(self, plan_file, key, format_version): - if format_version in semantic_version.Spec('>=2.0.0,<2.1.0'): - for plan in plan_file.keys(): - if plan in ('Type', 'URL'): - raise exc.IncorrectFormat( - 2, 'Download file is {0} not valid for this ' - 'version {1}'.format(key, format_version)) - - if 'Type' in plan_file: - for attr in ('Type', 'URL', 'Name'): - if attr not in plan_file: - raise exc.IncorrectFormat( - 2, - 'Incorrect {0} entry in file {1}'.format(attr, key)) - - elif 'Body' in plan_file: - for attr in ('BodyType', 'Body', 'Name'): - if attr not in plan_file: - raise exc.IncorrectFormat( - 2, 'Incorrect {0} entry in file {1}'.format( - attr, key)) - - if plan_file['BodyType'] not in ('Text', 'Base64'): - raise exc.IncorrectFormat( - 2, 'Incorrect BodyType in file {1}'.format(key)) - else: - raise exc.IncorrectFormat( - 2, 'Invalid file {0}: {1}'.format( - key, plan_file)) diff --git a/muranoagent/common/config.py b/muranoagent/common/config.py index 7dd39ba1..b3eb1bca 100644 --- a/muranoagent/common/config.py +++ b/muranoagent/common/config.py @@ -24,17 +24,20 @@ from muranoagent import version CONF = cfg.CONF -storage_opt = [ +opts = [ cfg.StrOpt('storage', default='/var/murano/plans', - help='Directory to store execution plans') + help='Directory to store execution plans'), + + cfg.StrOpt('engine_key', + help='Public key of murano-engine') ] -message_routing_opt = [ - cfg.BoolOpt('enable_dynamic_result_queue', help='Enable taking dynamic ' - 'result queue from task field reply_to', - default=False) -] +message_routing_opt = cfg.BoolOpt( + 'enable_dynamic_result_queue', + help='Enable taking dynamic result queue from task field reply_to', + default=False) + rabbit_opts = [ cfg.HostAddressOpt('host', @@ -79,8 +82,8 @@ rabbit_opts = [ ] -CONF.register_cli_opts(storage_opt) -CONF.register_cli_opts(message_routing_opt) +CONF.register_opts(opts) +CONF.register_cli_opt(message_routing_opt) CONF.register_opts(rabbit_opts, group='rabbitmq') logging.register_options(CONF) diff --git a/muranoagent/common/messaging/message.py b/muranoagent/common/messaging/message.py index 22dd0653..cf2a91a1 100644 --- a/muranoagent/common/messaging/message.py +++ b/muranoagent/common/messaging/message.py @@ -13,7 +13,6 @@ # See the License for the specific language governing permissions and # limitations under the License. -import anyjson from oslo_log import log as logging LOG = logging.getLogger("murano-common.messaging") @@ -27,20 +26,17 @@ class Message(object): if message_handle: self.id = message_handle.properties.get('message_id') self._reply_to = message_handle.properties.get('reply_to') + self._signature = message_handle.headers.get('signature') else: self.id = None self._reply_to = None + self._signature = None - try: - if message_handle: - if isinstance(message_handle.body, bytes): - message_handle.body = message_handle.body.decode('utf-8') - self.body = anyjson.loads(message_handle.body) - else: - self.body = None - except ValueError: + if message_handle: + self.body = message_handle.body + + else: self.body = None - LOG.exception('Message is not in JSON format') @property def body(self): @@ -64,3 +60,7 @@ class Message(object): def ack(self): self._message_handle.ack() + + @property + def signature(self): + return self._signature diff --git a/muranoagent/execution_plan_queue.py b/muranoagent/execution_plan_queue.py index 9a019155..7a8c1d22 100644 --- a/muranoagent/execution_plan_queue.py +++ b/muranoagent/execution_plan_queue.py @@ -13,21 +13,29 @@ # See the License for the specific language governing permissions and # limitations under the License. +import base64 import json import os import shutil import time +from cryptography.hazmat import backends +from cryptography.hazmat.primitives.asymmetric import padding +from cryptography.hazmat.primitives import hashes +from cryptography.hazmat.primitives import serialization +from oslo_log import log as logging from muranoagent import bunch from muranoagent.common import config CONF = config.CONF +LOG = logging.getLogger(__name__) class ExecutionPlanQueue(object): plan_filename = 'plan.json' result_filename = 'result.json' + stamp_filename = 'stamp' def __init__(self): self._plans_folder = os.path.join(CONF.storage, 'plans') @@ -38,16 +46,25 @@ class ExecutionPlanQueue(object): os.chmod(self._plans_folder, 0o700) except OSError: pass + self._key = None if not CONF.engine_key \ + else serialization.load_pem_public_key( + CONF.engine_key, backends.default_backend()) + self._load_stamp() - def put_execution_plan(self, execution_plan): + def put_execution_plan(self, execution_plan, signature, msg_id, reply_to): timestamp = str(int(time.time() * 10000)) # execution_plan['_timestamp'] = timestamp folder_path = os.path.join(self._plans_folder, timestamp) os.mkdir(folder_path) - file_path = os.path.join( + plan_file_path = os.path.join( folder_path, ExecutionPlanQueue.plan_filename) - with open(file_path, 'w') as out_file: - out_file.write(json.dumps(execution_plan)) + with open(plan_file_path, 'wb') as out_file: + out_file.write(json.dumps({ + 'Data': base64.b64encode(execution_plan), + 'Signature': base64.b64encode(signature or ''), + 'ID': msg_id, + 'ReplyTo': reply_to + })) def _get_first_timestamp(self, filename): def predicate(folder): @@ -70,11 +87,46 @@ class ExecutionPlanQueue(object): return json.loads(json_file.read()), timestamp def get_execution_plan(self): - ep, timestamp = self._get_first_file(ExecutionPlanQueue.plan_filename) - if ep is None: - return None - ep['_timestamp'] = timestamp - return bunch.Bunch(ep) + while True: + ep_info, timestamp = self._get_first_file( + ExecutionPlanQueue.plan_filename) + if ep_info is None: + return None + + try: + data = base64.b64decode(ep_info['Data']) + if self._key: + signature = base64.b64decode(ep_info['Signature']) + self._verify_signature(data, signature) + + ep = json.loads(data) + if not isinstance(ep, dict): + raise ValueError('Message is not a document') + + stamp = ep.get('Stamp', -1) + if stamp >= 0: + if stamp <= self._last_stamp: + raise ValueError('Dropping old/duplicate message') + self._save_stamp(stamp) + + if 'ID' not in ep: + ep['ID'] = ep_info['ID'] + if 'ReplyTo' not in ep: + ep['ReplyTo'] = ep_info['ReplyTo'] + + ep['_timestamp'] = timestamp + return bunch.Bunch(ep) + except Exception as ex: + LOG.exception(ex) + self.remove(timestamp) + + def _verify_signature(self, data, signature): + if not signature: + raise ValueError("Required signature was not found") + self._key.verify( + signature, + CONF.rabbitmq.input_queue + data, + padding.PKCS1v15(), hashes.SHA256()) def put_execution_result(self, result, execution_plan): timestamp = execution_plan['_timestamp'] @@ -93,3 +145,19 @@ class ExecutionPlanQueue(object): def get_execution_plan_result(self): return self._get_first_file( ExecutionPlanQueue.result_filename) + + def _load_stamp(self): + plan_file_path = os.path.join( + self._plans_folder, ExecutionPlanQueue.stamp_filename) + if os.path.exists(plan_file_path): + with open(plan_file_path) as f: + self._last_stamp = int(f.read()) + else: + self._last_stamp = 0 + + def _save_stamp(self, stamp): + plan_file_path = os.path.join( + self._plans_folder, ExecutionPlanQueue.stamp_filename) + with open(plan_file_path, 'w') as f: + f.write(str(stamp)) + self._last_stamp = stamp diff --git a/muranoagent/tests/unit/test_app.py b/muranoagent/tests/unit/test_app.py index 6d4260ce..ea9ed65b 100644 --- a/muranoagent/tests/unit/test_app.py +++ b/muranoagent/tests/unit/test_app.py @@ -16,8 +16,6 @@ import fixtures import mock import ssl as ssl_module -from oslo_service import sslutils - from muranoagent import app from muranoagent import bunch from muranoagent.common import config as cfg @@ -25,6 +23,7 @@ from muranoagent.common.messaging import mqclient from muranoagent import exceptions as exc from muranoagent.tests.unit import base from muranoagent.tests.unit import execution_plan as ep +from muranoagent import validation CONF = cfg.CONF @@ -35,11 +34,15 @@ class TestApp(base.MuranoAgentTestCase, fixtures.FunctionFixture): @mock.patch('os.path.exists') def setUp(self, mock_path, mock_chmod): super(TestApp, self).setUp() - mock_path.return_value = True + mock_path.side_effect = self._exists self.agent = app.MuranoAgent() CONF.set_override('storage', 'cache') self.addCleanup(CONF.clear_override, 'storage') + @staticmethod + def _exists(path): + return 'stamp' not in path + def test_verify_execution_plan_downloable(self): template = self.useFixture(ep.ExPlanDownloable()).execution_plan self.agent._verify_plan(template) @@ -50,13 +53,13 @@ class TestApp(base.MuranoAgentTestCase, fixtures.FunctionFixture): FormatVersion='0.0.0', ) self.assertRaises(exc.IncorrectFormat, - self.agent._verify_plan, template) + validation.validate_plan, template) def test_verify_over_max_execution_plan(self): template = self.useFixture(ep.ExPlanApplication()).execution_plan template['FormatVersion'] = '1000.0.0' self.assertRaises(exc.IncorrectFormat, - self.agent._verify_plan, template) + validation.validate_plan, template) def test_verify_execution_application(self): template = self.useFixture(ep.ExPlanApplication()).execution_plan @@ -71,12 +74,12 @@ class TestApp(base.MuranoAgentTestCase, fixtures.FunctionFixture): } template['FormatVersion'] = '2.0.0' self.assertRaises(exc.IncorrectFormat, - self.agent._verify_plan, template) + validation.validate_plan, template) def test_verify_execution_plan_no_files(self): template = self.useFixture(ep.ExPlanDownloableNoFiles()).execution_plan self.assertRaises(exc.IncorrectFormat, - self.agent._verify_plan, template) + validation.validate_plan, template) def test_verify_execution_plan_berkshelf(self): template = self.useFixture(ep.ExPlanBerkshelf()).execution_plan @@ -85,7 +88,7 @@ class TestApp(base.MuranoAgentTestCase, fixtures.FunctionFixture): def test_verify_execution_plan_berkshelf_wrong_version(self): template = self.useFixture(ep.ExPlanBerkWrongVersion()).execution_plan self.assertRaises(exc.IncorrectFormat, - self.agent._verify_plan, template) + validation.validate_plan, template) @mock.patch.object(mqclient, 'random', autospec=True) @mock.patch.object(mqclient, 'kombu', autospec=True) diff --git a/muranoagent/validation.py b/muranoagent/validation.py new file mode 100644 index 00000000..38dff441 --- /dev/null +++ b/muranoagent/validation.py @@ -0,0 +1,140 @@ +# Copyright (c) 2017 Mirantis Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import semantic_version +import six + +from muranoagent import exceptions as exc + + +max_format_version = semantic_version.Spec('<=2.2.0') + + +def validate_plan(plan): + plan_format_version = semantic_version.Version( + plan.get('FormatVersion', '1.0.0')) + + if plan_format_version not in max_format_version: + # NOTE(kazitsev) this is Version in Spec not str in str + raise exc.IncorrectFormat( + 9, + "Unsupported format version {0} " + "(I support versions {1})".format( + plan_format_version, max_format_version)) + + for attr in ('Scripts', 'Files'): + if attr not in plan: + raise exc.IncorrectFormat( + 2, '{0} is not in the execution plan'.format(attr)) + + for attr in ('Scripts', 'Files', 'Options'): + if attr in plan and not isinstance( + plan[attr], dict): + raise exc.IncorrectFormat( + 2, '{0} is not a dictionary'.format(attr)) + + for name, script in plan.get('Scripts', {}).items(): + _validate_script(name, script, plan_format_version, plan) + + for key, plan_file in plan.get('Files', {}).items(): + _validate_file(plan_file, key, plan_format_version) + + +def _validate_script(name, script, plan_format_version, plan): + for attr in ('Type', 'EntryPoint'): + if attr not in script or not isinstance(script[attr], + six.string_types): + raise exc.IncorrectFormat( + 2, 'Incorrect {0} entry in script {1}'.format( + attr, name)) + + if plan_format_version in semantic_version.Spec('>=2.0.0,<2.1.0'): + if script['Type'] != 'Application': + raise exc.IncorrectFormat( + 2, 'Type {0} is not valid for format {1}'.format( + script['Type'], plan_format_version)) + if script['EntryPoint'] not in plan.get('Files', {}): + raise exc.IncorrectFormat( + 2, 'Script {0} misses entry point {1}'.format( + name, script['EntryPoint'])) + + if plan_format_version in semantic_version.Spec('>=2.1.0'): + if script['Type'] not in ('Application', 'Chef', 'Puppet'): + raise exc.IncorrectFormat( + 2, 'Script has not a valid type {0}'.format( + script['Type'])) + if (script['Type'] == 'Application' and script['EntryPoint'] + not in plan.get('Files', {})): + raise exc.IncorrectFormat( + 2, 'Script {0} misses entry point {1}'.format( + name, script['EntryPoint'])) + elif (script['Type'] != 'Application' and + "::" not in script['EntryPoint']): + raise exc.IncorrectFormat( + 2, 'Wrong EntryPoint {0} for Puppet/Chef ' + 'executors. :: needed'.format(script['EntryPoint'])) + + for option in script['Options']: + if option in ('useBerkshelf', 'berksfilePath'): + if plan_format_version in semantic_version.Spec('<2.2.0'): + raise exc.IncorrectFormat( + 2, 'Script has an option {0} invalid ' + 'for version {1}'.format(option, + plan_format_version)) + elif script['Type'] != 'Chef': + raise exc.IncorrectFormat( + 2, 'Script has an option {0} invalid ' + 'for type {1}'.format(option, script['Type'])) + + for additional_file in script.get('Files', []): + mns_error = ('Script {0} misses file {1}'.format( + name, additional_file)) + if isinstance(additional_file, dict): + if (list(additional_file.keys())[0] not in + plan.get('Files', {}).keys()): + raise exc.IncorrectFormat(2, mns_error) + elif additional_file not in plan.get('Files', {}): + raise exc.IncorrectFormat(2, mns_error) + + +def _validate_file(plan_file, key, format_version): + if format_version in semantic_version.Spec('>=2.0.0,<2.1.0'): + for plan in plan_file.keys(): + if plan in ('Type', 'URL'): + raise exc.IncorrectFormat( + 2, 'Download file is {0} not valid for this ' + 'version {1}'.format(key, format_version)) + + if 'Type' in plan_file: + for attr in ('Type', 'URL', 'Name'): + if attr not in plan_file: + raise exc.IncorrectFormat( + 2, + 'Incorrect {0} entry in file {1}'.format(attr, key)) + + elif 'Body' in plan_file: + for attr in ('BodyType', 'Body', 'Name'): + if attr not in plan_file: + raise exc.IncorrectFormat( + 2, 'Incorrect {0} entry in file {1}'.format( + attr, key)) + + if plan_file['BodyType'] not in ('Text', 'Base64'): + raise exc.IncorrectFormat( + 2, 'Incorrect BodyType in file {0}'.format(key)) + else: + raise exc.IncorrectFormat( + 2, 'Invalid file {0}: {1}'.format( + key, plan_file)) diff --git a/requirements.txt b/requirements.txt index 40f34b5c..c3e567cb 100644 --- a/requirements.txt +++ b/requirements.txt @@ -14,3 +14,4 @@ PyYAML>=3.10 # MIT six>=1.10.0 # MIT semantic-version>=2.3.1 # BSD requests>=2.14.2 # Apache-2.0 +cryptography>=1.9,!=2.0 # BSD/Apache-2.0