From 071f99b3d8fa739a2563a63a4fc863b352c14246 Mon Sep 17 00:00:00 2001 From: Fabio Verboso Date: Tue, 30 Apr 2019 17:07:25 +0200 Subject: [PATCH] Results and Requests Iotronic logs every request sent to the board and its result Change-Id: Iec5d44de970c1bbd3e304185f340a4a99832474c --- iotronic/conductor/endpoints.py | 72 ++++++- iotronic/db/api.py | 53 +++++ .../76c628d60004_add_results_requests.py | 53 +++++ iotronic/db/sqlalchemy/api.py | 93 +++++++++ iotronic/db/sqlalchemy/models.py | 32 +++ iotronic/objects/__init__.py | 8 +- iotronic/objects/request.py | 195 ++++++++++++++++++ iotronic/objects/result.py | 154 ++++++++++++++ iotronic/wamp/agent.py | 17 +- iotronic/wamp/functions.py | 23 +++ iotronic/wamp/wampmessage.py | 21 +- 11 files changed, 704 insertions(+), 17 deletions(-) create mode 100644 iotronic/db/sqlalchemy/alembic/versions/76c628d60004_add_results_requests.py create mode 100644 iotronic/objects/request.py create mode 100644 iotronic/objects/result.py diff --git a/iotronic/conductor/endpoints.py b/iotronic/conductor/endpoints.py index ef5d7b7..34f6f8e 100644 --- a/iotronic/conductor/endpoints.py +++ b/iotronic/conductor/endpoints.py @@ -41,6 +41,27 @@ serializer = objects_base.IotronicObjectSerializer() Port = list() +# Method to compare two versions. +# Return 1 if v2 is smaller, +# -1 if v1 is smaller, +# 0 if equal +def versionCompare(v1, v2): + v1_list = v1.split(".")[:3] + v2_list = v2.split(".")[:3] + i = 0 + while (i < len(v1_list)): + + # v2 > v1 + if int(v2_list[i]) > int(v1_list[i]): + return -1 + # v1 > v1 + if int(v1_list[i]) > int(v2_list[i]): + return 1 + i += 1 + # v2 == v1 + return 0 + + def get_best_agent(ctx): agents = objects.WampAgent.list(ctx, filters={'online': True}) LOG.debug('found %d Agent(s).', len(agents)) @@ -233,13 +254,50 @@ class ConductorEndpoint(object): if not board.is_online(): raise exception.BoardNotConnected(board=board.uuid) - cctx = self.wamp_agent_client.prepare(server=board.agent) - res = cctx.call(ctx, 's4t_invoke_wamp', - wamp_rpc_call=full_wamp_call, - data=wamp_rpc_args) - res = wm.deserialize(res) + req_data = { + 'destination_uuid': board_uuid, + 'type': objects.request.BOARD, + 'status': objects.request.PENDING, + 'action': wamp_rpc_call + } + req = objects.Request(ctx, **req_data) + req.create() - return res + res_data = { + 'board_uuid': board_uuid, + 'request_uuid': req.uuid, + 'result': objects.result.RUNNING, + 'message': "" + } + + res = objects.Result(ctx, **res_data) + res.create() + + cctx = self.wamp_agent_client.prepare(server=board.agent) + + # for previous LR version (to be removed asap) + if (versionCompare(board.lr_version, "0.4.9") == -1): + + response = cctx.call(ctx, 's4t_invoke_wamp', + wamp_rpc_call=full_wamp_call, + data=wamp_rpc_args) + else: + + response = cctx.call(ctx, 's4t_invoke_wamp', + wamp_rpc_call=full_wamp_call, + req_uuid=req.uuid, + data=wamp_rpc_args) + + response = wm.deserialize(response) + + if (response.result != wm.RUNNING): + res.result = response.result + res.message = response.message + res.save() + req.status = objects.request.COMPLETED + req.save() + + return response def action_board(self, ctx, board_uuid, action, params): @@ -254,7 +312,7 @@ class ConductorEndpoint(object): try: result = self.execute_on_board(ctx, board_uuid, action, - (params)) + (params,)) except exception: return exception diff --git a/iotronic/db/api.py b/iotronic/db/api.py index d6ffdfb..c60f7f9 100644 --- a/iotronic/db/api.py +++ b/iotronic/db/api.py @@ -709,3 +709,56 @@ class Connection(object): :param enabled_webservice_id: The id or uuid of a enabled_webservice. """ + + @abc.abstractmethod + def create_request(self, values): + """Create a new webservice. + + :param values: A dict containing several items used to identify + and track the request + :returns: A request. + """ + + @abc.abstractmethod + def create_result(self, values): + """Create a new webservice. + + :param values: A dict containing several items used to identify + and track the result + :returns: A result. + """ + + @abc.abstractmethod + def get_result(self, board_uuid, request_uuid): + """get a result. + + :param board_uuid: the board uuid result. + :param request_uuid: the request_uuid. + :returns: A result. + """ + + @abc.abstractmethod + def update_result(self, result_id, values): + """Update properties of a result. + + :param result_id: The id or uuid of a fleet. + :param values: Dict of values to update. + :returns: A result. + """ + + @abc.abstractmethod + def update_request(self, request_id, values): + """Update properties of a result. + + :param request_id: The id or uuid of a fleet. + :param values: Dict of values to update. + :returns: A request. + """ + + @abc.abstractmethod + def get_results(self, request_uuid): + """get results of a request. + + :param request_uuid: the request_uuid. + :returns: a list of results + """ diff --git a/iotronic/db/sqlalchemy/alembic/versions/76c628d60004_add_results_requests.py b/iotronic/db/sqlalchemy/alembic/versions/76c628d60004_add_results_requests.py new file mode 100644 index 0000000..301e282 --- /dev/null +++ b/iotronic/db/sqlalchemy/alembic/versions/76c628d60004_add_results_requests.py @@ -0,0 +1,53 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +# revision identifiers, used by Alembic. +revision = '76c628d60004' +down_revision = 'b98819997377' + +from alembic import op +import sqlalchemy as sa + + +def upgrade(): + op.create_table('requests', + sa.Column('created_at', sa.DateTime(), nullable=True), + sa.Column('updated_at', sa.DateTime(), nullable=True), + sa.Column('id', sa.Integer(), nullable=False), + sa.Column('uuid', sa.String(length=36), nullable=False), + sa.Column('destination_uuid', sa.String(length=36), + nullable=False), + sa.Column('status', sa.String(length=10), nullable=False), + sa.Column('type', sa.Integer(), nullable=False), + sa.Column('action', sa.String(length=15), nullable=False), + sa.PrimaryKeyConstraint('id'), + sa.UniqueConstraint('uuid', name='uniq_requests0uuid') + ) + + op.create_table('results', + sa.Column('created_at', sa.DateTime(), nullable=True), + sa.Column('updated_at', sa.DateTime(), nullable=True), + sa.Column('id', sa.Integer(), nullable=False), + sa.Column('result', sa.String(length=10), nullable=False), + sa.Column('message', sa.TEXT(), nullable=True), + sa.Column('board_uuid', sa.String(length=36), + nullable=False), + sa.Column('request_uuid', sa.String(length=36), + nullable=False), + sa.PrimaryKeyConstraint('id'), + sa.UniqueConstraint('board_uuid', 'request_uuid', + name='uniq_request_on_board'), + sa.ForeignKeyConstraint(['board_uuid'], + ['boards.uuid'], ), + sa.ForeignKeyConstraint(['request_uuid'], + ['requests.uuid'], ) + ) diff --git a/iotronic/db/sqlalchemy/api.py b/iotronic/db/sqlalchemy/api.py index cdffdde..fd0b496 100644 --- a/iotronic/db/sqlalchemy/api.py +++ b/iotronic/db/sqlalchemy/api.py @@ -226,6 +226,15 @@ class Connection(api.Connection): query = query. \ filter(models.Port.board_uuid == filters['board_uuid']) + def _add_result_filters(self, query, filters): + if filters is None: + filters = [] + + if 'result' in filters: + query = query.filter(models.Result.result == filters['result']) + + return query + def _do_update_board(self, board_id, values): session = get_session() with session.begin(): @@ -1196,3 +1205,87 @@ class Connection(api.Connection): query = self._add_enabled_webservices_filters(query, filters) return _paginate_query(models.EnabledWebservice, limit, marker, sort_key, sort_dir, query) + + # REQUEST + + def get_request_by_id(self, request_id): + query = model_query(models.Request).filter_by(id=request_id) + try: + return query.one() + except NoResultFound: + raise exception.RequestNotFound(request=request_id) + + def get_request_by_uuid(self, request_uuid): + query = model_query(models.Request).filter_by(uuid=request_uuid) + try: + return query.one() + except NoResultFound: + raise exception.RequestNotFound(request=request_uuid) + + def _do_update_request(self, update_id, values): + session = get_session() + with session.begin(): + query = model_query(models.Request, session=session) + query = add_identity_filter(query, update_id) + try: + ref = query.with_lockmode('update').one() + except NoResultFound: + raise exception.RequestNotFound(result=update_id) + ref.update(values) + return ref + + def create_request(self, values): + # ensure defaults are present for new requests + if 'uuid' not in values: + values['uuid'] = uuidutils.generate_uuid() + request = models.Request() + request.update(values) + request.save() + return request + + def update_request(self, request_id, values): + if 'uuid' in values: + msg = _("Cannot overwrite UUID for an existing Request.") + raise exception.InvalidParameterValue(err=msg) + return self._do_update_request(request_id, values) + + # RESULT + + def _do_update_result(self, update_id, values): + session = get_session() + with session.begin(): + query = model_query(models.Result, session=session) + query = add_identity_filter(query, update_id) + try: + ref = query.with_lockmode('update').one() + except NoResultFound: + raise exception.ResultNotFound(result=update_id) + ref.update(values) + return ref + + def create_result(self, values): + # ensure defaults are present for new results + result = models.Result() + result.update(values) + result.save() + return result + + def get_result(self, board_uuid, request_uuid): + query = model_query(models.Result).filter_by( + board_uuid=board_uuid).filter_by(request_uuid=request_uuid) + try: + return query.one() + except NoResultFound: + raise exception.ResultNotFound() + + def update_result(self, result_id, values): + return self._do_update_result(result_id, values) + + def get_results(self, request_uuid, filters=None): + query = model_query(models.Result).filter_by( + request_uuid=request_uuid) + query = self._add_result_filters(query, filters) + try: + return query.all() + except NoResultFound: + raise exception.ResultNotFound() diff --git a/iotronic/db/sqlalchemy/models.py b/iotronic/db/sqlalchemy/models.py index 2170c62..50b9290 100644 --- a/iotronic/db/sqlalchemy/models.py +++ b/iotronic/db/sqlalchemy/models.py @@ -314,3 +314,35 @@ class EnabledWebservice(Base): dns = Column(String(100)) zone = Column(String(100)) extra = Column(JSONEncodedDict) + + +class Request(Base): + """Represents a request.""" + + __tablename__ = 'requests' + __table_args__ = ( + schema.UniqueConstraint('uuid', name='uniq_requests0uuid'), + + table_args()) + id = Column(Integer, primary_key=True) + uuid = Column(String(36)) + + destination_uuid = Column(String(36)) + status = Column(String(10)) + type = Column(Integer) + action = Column(String(15)) + + +class Result(Base): + """Represents a result.""" + + __tablename__ = 'results' + __table_args__ = ( + schema.UniqueConstraint('board_uuid', 'request_uuid', + name='uniq_request_on_board'), + table_args()) + id = Column(Integer, primary_key=True) + board_uuid = Column(String(36)) + request_uuid = Column(String(36)) + result = Column(String(10)) + message = Column(TEXT) diff --git a/iotronic/objects/__init__.py b/iotronic/objects/__init__.py index 3316002..dc1d2cb 100644 --- a/iotronic/objects/__init__.py +++ b/iotronic/objects/__init__.py @@ -21,6 +21,8 @@ from iotronic.objects import injectionplugin from iotronic.objects import location from iotronic.objects import plugin from iotronic.objects import port +from iotronic.objects import request +from iotronic.objects import result from iotronic.objects import service from iotronic.objects import sessionwp from iotronic.objects import wampagent @@ -36,6 +38,8 @@ SessionWP = sessionwp.SessionWP WampAgent = wampagent.WampAgent Service = service.Service Webservice = webservice.Webservice +Request = request.Request +Result = result.Result Port = port.Port Fleet = fleet.Fleet EnabledWebservice = enabledwebservice.EnabledWebservice @@ -53,5 +57,7 @@ __all__ = ( Port, Fleet, Webservice, - EnabledWebservice + EnabledWebservice, + Request, + Result, ) diff --git a/iotronic/objects/request.py b/iotronic/objects/request.py new file mode 100644 index 0000000..c754705 --- /dev/null +++ b/iotronic/objects/request.py @@ -0,0 +1,195 @@ +# coding=utf-8 +# +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +from oslo_utils import strutils +from oslo_utils import uuidutils + +from iotronic.common import exception +from iotronic.db import api as db_api +from iotronic.objects import base +from iotronic.objects.result import Result +from iotronic.objects import utils as obj_utils + +BOARD = 0 +FLOAT = 1 + +COMPLETED = "COMPLETED" +PENDING = "PENDING" + + +class Request(base.IotronicObject): + # Version 1.0: Initial version + VERSION = '1.0' + + dbapi = db_api.get_instance() + + fields = { + 'id': int, + 'uuid': obj_utils.str_or_none, + 'destination_uuid': obj_utils.str_or_none, + 'status': obj_utils.str_or_none, + 'type': int, + 'action': obj_utils.str_or_none, + } + + @staticmethod + def _from_db_object(request, db_request): + """Converts a database entity to a formal object.""" + for field in request.fields: + request[field] = db_request[field] + request.obj_reset_changes() + return request + + @base.remotable_classmethod + def get(cls, context, request_id): + """Find a request based on its id or uuid and return a Board object. + + :param request_id: the id *or* uuid of a request. + :returns: a :class:`Board` object. + """ + if strutils.is_int_like(request_id): + return cls.get_by_id(context, request_id) + elif uuidutils.is_uuid_like(request_id): + return cls.get_by_uuid(context, request_id) + else: + raise exception.InvalidIdentity(identity=request_id) + + @base.remotable_classmethod + def get_by_id(cls, context, request_id): + """Find a request based on its integer id and return a Board object. + + :param request_id: the id of a request. + :returns: a :class:`Board` object. + """ + db_request = cls.dbapi.get_request_by_id(request_id) + request = Request._from_db_object(cls(context), db_request) + return request + + @base.remotable_classmethod + def get_by_uuid(cls, context, uuid): + """Find a request based on uuid and return a Board object. + + :param uuid: the uuid of a request. + :returns: a :class:`Board` object. + """ + db_request = cls.dbapi.get_request_by_uuid(uuid) + request = Request._from_db_object(cls(context), db_request) + return request + + @base.remotable_classmethod + def get_results(cls, context, request_uuid, filters=None): + """Find a request based on uuid and return a Board object. + + :param uuid: the uuid of a request. + :returns: a :class:`Board` object. + """ + return Result.get_results_list(context, + request_uuid, filters) + + # @base.remotable_classmethod + # def get_results_request(cls,context,request_uuid): + # db_requests = cls.dbapi.get_results(request_uuid) + # return [Result._from_db_object(cls(context), obj) + # for obj in db_requests] + + # @base.remotable_classmethod + # def get_by_name(cls, context, name): + # """Find a request based on name and return a Board object. + # + # :param name: the logical name of a request. + # :returns: a :class:`Board` object. + # """ + # db_request = cls.dbapi.get_request_by_name(name) + # request = Request._from_db_object(cls(context), db_request) + # return request + + # @base.remotable_classmethod + # def list(cls, context, limit=None, marker=None, sort_key=None, + # sort_dir=None, filters=None): + # """Return a list of Request objects. + # + # :param context: Security context. + # :param limit: maximum number of resources to return in a + # single result. + # :param marker: pagination marker for large data sets. + # :param sort_key: column to sort results by. + # :param sort_dir: direction to sort. "asc" or "desc". + # :param filters: Filters to apply. + # :returns: a list of :class:`Request` object. + # + # """ + # db_requests = cls.dbapi.get_request_list(filters=filters, + # limit=limit, + # marker=marker, + # sort_key=sort_key, + # sort_dir=sort_dir) + # return [Request._from_db_object(cls(context), obj) + # for obj in db_requests] + + @base.remotable + def create(self, context=None): + """Create a Request record in the DB. + + Column-wise updates will be made based on the result of + self.what_changed(). If target_power_state is provided, + it will be checked against the in-database copy of the + request before updates are made. + + :param context: Security context. NOTE: This should only + be used internally by the indirection_api. + Unfortunately, RPC requires context as the first + argument, even though we don't use it. + A context should be set when instantiating the + object, e.g.: Request(context) + + """ + + values = self.obj_get_changes() + db_request = self.dbapi.create_request(values) + self._from_db_object(self, db_request) + + # @base.remotable + # def destroy(self, context=None): + # """Delete the Request from the DB. + # + # :param context: Security context. NOTE: This should only + # be used internally by the indirection_api. + # Unfortunately, RPC requires context as the first + # argument, even though we don't use it. + # A context should be set when instantiating the + # object, e.g.: Request(context) + # """ + # self.dbapi.destroy_request(self.uuid) + # self.obj_reset_changes() + + @base.remotable + def save(self, context=None): + """Save updates to this Request. + + Column-wise updates will be made based on the result of + self.what_changed(). If target_power_state is provided, + it will be checked against the in-database copy of the + request before updates are made. + + :param context: Security context. NOTE: This should only + be used internally by the indirection_api. + Unfortunately, RPC requires context as the first + argument, even though we don't use it. + A context should be set when instantiating the + object, e.g.: Request(context) + """ + updates = self.obj_get_changes() + self.dbapi.update_request(self.uuid, updates) + self.obj_reset_changes() diff --git a/iotronic/objects/result.py b/iotronic/objects/result.py new file mode 100644 index 0000000..e23df74 --- /dev/null +++ b/iotronic/objects/result.py @@ -0,0 +1,154 @@ +# coding=utf-8 +# +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +# from oslo_utils import strutils +# from oslo_utils import uuidutils + +# from iotronic.common import exception +from iotronic.db import api as db_api +from iotronic.objects import base +from iotronic.objects import utils as obj_utils + +SUCCESS = "SUCCESS" +ERROR = "ERROR" +WARNING = "WARNING" +RUNNING = "RUNNING" + + +class Result(base.IotronicObject): + # Version 1.0: Initial version + VERSION = '1.0' + + dbapi = db_api.get_instance() + + fields = { + 'id': int, + 'board_uuid': obj_utils.str_or_none, + 'request_uuid': obj_utils.str_or_none, + 'result': obj_utils.str_or_none, + 'message': obj_utils.str_or_none, + } + + @staticmethod + def _from_db_object(result, db_result): + """Converts a database entity to a formal object.""" + for field in result.fields: + result[field] = db_result[field] + result.obj_reset_changes() + return result + + @base.remotable_classmethod + def get(cls, context, board_uuid, request_uuid): + """Find a result based on name and return a Board object. + + :param board_uuid: the board uuid result. + :param request_uuid: the request_uuid. + :returns: a :class:`result` object. + """ + db_result = cls.dbapi.get_result(board_uuid, request_uuid) + result = Result._from_db_object(cls(context), db_result) + return result + + @base.remotable_classmethod + def get_results_list(cls, context, request_uuid, filters=None): + """Find a result based on name and return a Board object. + + :param board_uuid: the board uuid result. + :param request_uuid: the request_uuid. + :returns: a :class:`result` object. + """ + db_requests = cls.dbapi.get_results(request_uuid, + filters=filters) + return [Result._from_db_object(cls(context), obj) + for obj in db_requests] + + # @base.remotable_classmethod + # def list(cls, context, limit=None, marker=None, sort_key=None, + # sort_dir=None, filters=None): + # """Return a list of Result objects. + # + # :param context: Security context. + # :param limit: maximum number of resources to return in a + # single result. + # :param marker: pagination marker for large data sets. + # :param sort_key: column to sort results by. + # :param sort_dir: direction to sort. "asc" or "desc". + # :param filters: Filters to apply. + # :returns: a list of :class:`Result` object. + # + # """ + # db_results = cls.dbapi.get_result_list(filters=filters, + # limit=limit, + # marker=marker, + # sort_key=sort_key, + # sort_dir=sort_dir) + # return [Result._from_db_object(cls(context), obj) + # for obj in db_results] + + @base.remotable + def create(self, context=None): + """Create a Result record in the DB. + + Column-wise updates will be made based on the result of + self.what_changed(). If target_power_state is provided, + it will be checked against the in-database copy of the + result before updates are made. + + :param context: Security context. NOTE: This should only + be used internally by the indirection_api. + Unfortunately, RPC requires context as the first + argument, even though we don't use it. + A context should be set when instantiating the + object, e.g.: Result(context) + + """ + + values = self.obj_get_changes() + db_result = self.dbapi.create_result(values) + self._from_db_object(self, db_result) + + # @base.remotable + # def destroy(self, context=None): + # """Delete the Result from the DB. + # + # :param context: Security context. NOTE: This should only + # be used internally by the indirection_api. + # Unfortunately, RPC requires context as the first + # argument, even though we don't use it. + # A context should be set when instantiating the + # object, e.g.: Result(context) + # """ + # self.dbapi.destroy_result(self.uuid) + # self.obj_reset_changes() + + @base.remotable + def save(self, context=None): + """Save updates to this Result. + + Column-wise updates will be made based on the result of + self.what_changed(). If target_power_state is provided, + it will be checked against the in-database copy of the + result before updates are made. + + :param context: Security context. NOTE: This should only + be used internally by the indirection_api. + Unfortunately, RPC requires context as the first + argument, even though we don't use it. + A context should be set when instantiating the + object, e.g.: Result(context) + """ + updates = self.obj_get_changes() + self.dbapi.update_result(self.id, updates) + self.obj_reset_changes() diff --git a/iotronic/wamp/agent.py b/iotronic/wamp/agent.py index 4b6c6e0..00f5ffa 100644 --- a/iotronic/wamp/agent.py +++ b/iotronic/wamp/agent.py @@ -95,8 +95,19 @@ connected = False async def wamp_request(kwarg): - LOG.debug("calling: " + kwarg['wamp_rpc_call']) - d = await wamp_session_caller.call(kwarg['wamp_rpc_call'], *kwarg['data']) + # for previous LR version (to be removed asap) + if 'req_uuid' in kwarg: + + LOG.debug("calling: " + kwarg['wamp_rpc_call'] + + " with request id: " + kwarg['req_uuid']) + d = await wamp_session_caller.call(kwarg['wamp_rpc_call'], + kwarg['req_uuid'], + *kwarg['data']) + else: + LOG.debug("calling: " + kwarg['wamp_rpc_call']) + d = await wamp_session_caller.call(kwarg['wamp_rpc_call'], + *kwarg['data']) + return d @@ -234,6 +245,8 @@ class WampManager(object): AGENT_HOST + u'.stack4things.alive') session.register(fun.wamp_alive, AGENT_HOST + u'.stack4things.wamp_alive') + session.register(fun.notify_result, + AGENT_HOST + u'.stack4things.notify_result') LOG.debug("procedure registered") except Exception as e: diff --git a/iotronic/wamp/functions.py b/iotronic/wamp/functions.py index a47d81d..536cf27 100644 --- a/iotronic/wamp/functions.py +++ b/iotronic/wamp/functions.py @@ -175,3 +175,26 @@ def registration(code, session): def board_on_join(session_id): LOG.debug('A board with %s joined', session_id['session']) + + +def notify_result(board_uuid, wampmessage): + wmsg = wm.deserialize(wampmessage) + LOG.info('Board %s completed the its request %s with result: %s', + board_uuid, wmsg.req_id, wmsg.result) + + res = objects.Result.get(ctxt, board_uuid, wmsg.req_id) + res.result = wmsg.result + res.message = wmsg.message + res.save() + + filter = {"result": objects.result.RUNNING} + + list_result = objects.Request.get_results(ctxt, + wmsg.req_id, + filter) + if len(list_result) == 0: + req = objects.Request.get_by_uuid(ctxt, wmsg.req_id) + req.status = objects.request.COMPLETED + req.save() + + return wm.WampSuccess('notification_received').serialize() diff --git a/iotronic/wamp/wampmessage.py b/iotronic/wamp/wampmessage.py index f6a2dff..3aac6b5 100644 --- a/iotronic/wamp/wampmessage.py +++ b/iotronic/wamp/wampmessage.py @@ -19,6 +19,7 @@ import json SUCCESS = 'SUCCESS' ERROR = 'ERROR' WARNING = 'WARNING' +RUNNING = 'RUNNING' def deserialize(received): @@ -27,24 +28,30 @@ def deserialize(received): class WampMessage(object): - def __init__(self, message=None, result=None): + def __init__(self, message, result, req_id): self.message = message self.result = result + self.req_id = req_id def serialize(self): return json.dumps(self, default=lambda o: o.__dict__) class WampSuccess(WampMessage): - def __init__(self, msg=None): - super(WampSuccess, self).__init__(msg, SUCCESS) + def __init__(self, msg=None, req_id=None): + super(WampSuccess, self).__init__(msg, SUCCESS, req_id) class WampError(WampMessage): - def __init__(self, msg=None): - super(WampError, self).__init__(msg, ERROR) + def __init__(self, msg=None, req_id=None): + super(WampError, self).__init__(msg, ERROR, req_id) class WampWarning(WampMessage): - def __init__(self, msg=None): - super(WampWarning, self).__init__(msg, WARNING) + def __init__(self, msg=None, req_id=None): + super(WampWarning, self).__init__(msg, WARNING, req_id) + + +class WampRunning(WampMessage): + def __init__(self, msg=None, req_id=None): + super(WampRunning, self).__init__(msg, RUNNING, req_id)