diff --git a/bin/iotronic-wamp-agent b/bin/iotronic-wamp-agent index 3b1af79..f9de64f 100755 --- a/bin/iotronic-wamp-agent +++ b/bin/iotronic-wamp-agent @@ -23,4 +23,4 @@ from iotronic.wamp.agent import WampAgent if __name__ == '__main__': - wa=WampAgent() + wa=WampAgent('agent') diff --git a/iotronic/common/exception.py b/iotronic/common/exception.py index 1359db1..e935bc1 100644 --- a/iotronic/common/exception.py +++ b/iotronic/common/exception.py @@ -29,7 +29,6 @@ import six from iotronic.common.i18n import _ from iotronic.common.i18n import _LE - LOG = logging.getLogger(__name__) exc_log_opts = [ @@ -290,6 +289,14 @@ class ConductorAlreadyRegistered(IotronicException): message = _("Conductor %(conductor)s already registered.") +class WampAgentNotFound(NotFound): + message = _("WampAgent %(wampagent)s could not be found.") + + +class WampAgentAlreadyRegistered(IotronicException): + message = _("WampAgent %(wampagent)s already registered.") + + class PowerStateFailure(InvalidState): message = _("Failed to set node power state to %(pstate)s.") diff --git a/iotronic/db/api.py b/iotronic/db/api.py index 5d2ff44..86ac764 100644 --- a/iotronic/db/api.py +++ b/iotronic/db/api.py @@ -23,7 +23,6 @@ from oslo_config import cfg from oslo_db import api as db_api import six - _BACKEND_MAPPING = {'sqlalchemy': 'iotronic.db.sqlalchemy.api'} IMPL = db_api.DBAPI.from_config(cfg.CONF, backend_mapping=_BACKEND_MAPPING, lazy=True) @@ -241,3 +240,28 @@ class Connection(object): (asc, desc) :returns: A list of locations. """ + + @abc.abstractmethod + def get_wampagent(self, hostname): + """Retrieve a wampagent's service record from the database. + + :param hostname: The hostname of the wampagent service. + :returns: A wampagent. + :raises: WampAgentNotFound + """ + + @abc.abstractmethod + def unregister_wampagent(self, hostname): + """Remove this wampagent from the service registry immediately. + + :param hostname: The hostname of this wampagent service. + :raises: WampAgentNotFound + """ + + @abc.abstractmethod + def touch_wampagent(self, hostname): + """Mark a wampagent as active by updating its 'updated_at' property. + + :param hostname: The hostname of this wampagent service. + :raises: WampAgentNotFound + """ diff --git a/iotronic/db/sqlalchemy/api.py b/iotronic/db/sqlalchemy/api.py index df8b930..f043e0f 100644 --- a/iotronic/db/sqlalchemy/api.py +++ b/iotronic/db/sqlalchemy/api.py @@ -39,7 +39,6 @@ CONF.import_opt('heartbeat_timeout', LOG = log.getLogger(__name__) - _FACADE = None @@ -268,11 +267,11 @@ class Connection(api.Connection): values['inspection_started_at'] = timeutils.utcnow() values['inspection_finished_at'] = None elif (ref.provision_state == states.INSPECTING and - values['provision_state'] == states.MANAGEABLE): + values['provision_state'] == states.MANAGEABLE): values['inspection_finished_at'] = timeutils.utcnow() values['inspection_started_at'] = None elif (ref.provision_state == states.INSPECTING and - values['provision_state'] == states.INSPECTFAIL): + values['provision_state'] == states.INSPECTFAIL): values['inspection_started_at'] = None ref.update(values) @@ -397,3 +396,52 @@ class Connection(api.Connection): return query.one() except NoResultFound: return None + + def register_wampagent(self, values, update_existing=False): + session = get_session() + with session.begin(): + query = (model_query(models.WampAgent, session=session) + .filter_by(hostname=values['hostname'])) + try: + ref = query.one() + if ref.online is True and not update_existing: + raise exception.WampAgentAlreadyRegistered( + wampagent=values['hostname']) + except NoResultFound: + ref = models.WampAgent() + ref.update(values) + # always set online and updated_at fields when registering + # a wampagent, especially when updating an existing one + ref.update({'updated_at': timeutils.utcnow(), + 'online': True}) + ref.save(session) + return ref + + def get_wampagent(self, hostname): + try: + return (model_query(models.WampAgent) + .filter_by(hostname=hostname, online=True) + .one()) + except NoResultFound: + raise exception.WampAgentNotFound(wampagent=hostname) + + def unregister_wampagent(self, hostname): + session = get_session() + with session.begin(): + query = (model_query(models.WampAgent, session=session) + .filter_by(hostname=hostname, online=True)) + count = query.update({'online': False}) + if count == 0: + raise exception.WampAgentNotFound(wampagent=hostname) + + def touch_wampagent(self, hostname): + session = get_session() + with session.begin(): + query = (model_query(models.WampAgent, session=session) + .filter_by(hostname=hostname)) + # since we're not changing any other field, manually set updated_at + # and since we're heartbeating, make sure that online=True + count = query.update({'updated_at': timeutils.utcnow(), + 'online': True}) + if count == 0: + raise exception.WampAgentNotFound(wampagent=hostname) diff --git a/iotronic/db/sqlalchemy/models.py b/iotronic/db/sqlalchemy/models.py index e05c614..0a3aa75 100644 --- a/iotronic/db/sqlalchemy/models.py +++ b/iotronic/db/sqlalchemy/models.py @@ -33,7 +33,6 @@ from sqlalchemy import String from sqlalchemy.types import TypeDecorator, TEXT from iotronic.common import paths - sql_opts = [ cfg.StrOpt('mysql_engine', default='InnoDB', @@ -41,8 +40,7 @@ sql_opts = [ ] _DEFAULT_SQL_CONNECTION = 'sqlite:///' + \ - paths.state_path_def('iotronic.sqlite') - + paths.state_path_def('iotronic.sqlite') cfg.CONF.register_opts(sql_opts, 'database') db_options.set_defaults(cfg.CONF, _DEFAULT_SQL_CONNECTION, 'iotronic.sqlite') @@ -92,7 +90,6 @@ class JSONEncodedList(JsonEncodedType): class IotronicBase(models.TimestampMixin, models.ModelBase): - metadata = None def as_dict(self): @@ -109,6 +106,7 @@ class IotronicBase(models.TimestampMixin, super(IotronicBase, self).save(session) + Base = declarative_base(cls=IotronicBase) @@ -125,6 +123,19 @@ class Conductor(Base): online = Column(Boolean, default=True) +class WampAgent(Base): + """Represents a wampagent service entry.""" + + __tablename__ = 'wampagents' + __table_args__ = ( + schema.UniqueConstraint('hostname', name='uniq_wampagentss0hostname'), + table_args() + ) + id = Column(Integer, primary_key=True) + hostname = Column(String(255), nullable=False) + online = Column(Boolean, default=True) + + class Node(Base): """Represents a Node.""" diff --git a/iotronic/wamp/agent.py b/iotronic/wamp/agent.py index d071c73..fbeceac 100644 --- a/iotronic/wamp/agent.py +++ b/iotronic/wamp/agent.py @@ -16,6 +16,9 @@ from autobahn.twisted import wamp from autobahn.twisted import websocket from autobahn.wamp import types +from iotronic.common import exception +from iotronic.common.i18n import _LW +from iotronic.db import api as dbapi from oslo_config import cfg from oslo_log import log as logging import oslo_messaging @@ -24,7 +27,6 @@ from threading import Thread from twisted.internet.protocol import ReconnectingClientFactory from twisted.internet import reactor - LOG = logging.getLogger(__name__) wamp_opts = [ @@ -41,7 +43,7 @@ CONF.register_opts(wamp_opts, 'wamp') shared_result = {} wamp_session_caller = None -AGENT_UUID = None +AGENT_HOST = None def wamp_request(e, kwarg, session): @@ -92,7 +94,7 @@ class WampEndpoint(object): class WampFrontend(wamp.ApplicationSession): def onJoin(self, details): - global wamp_session_caller, AGENT_UUID + global wamp_session_caller, AGENT_HOST wamp_session_caller = self import iotronic.wamp.registerd_functions as fun @@ -101,7 +103,7 @@ class WampFrontend(wamp.ApplicationSession): try: self.register(fun.registration, u'stack4things.register') - self.register(fun.echo, AGENT_UUID + u'.stack4things.echo') + self.register(fun.echo, AGENT_HOST + u'.stack4things.echo') LOG.info("procedure registered") except Exception as e: LOG.error("could not register procedure: {0}".format(e)) @@ -130,16 +132,17 @@ class WampClientFactory(websocket.WampWebSocketClientFactory, class RPCServer(Thread): - def __init__(self, agent_uuid): + def __init__(self): + global AGENT_HOST # AMQP CONFIG endpoints = [ - WampEndpoint(WampFrontend, agent_uuid), + WampEndpoint(WampFrontend, AGENT_HOST), ] Thread.__init__(self) transport = oslo_messaging.get_transport(CONF) - target = oslo_messaging.Target(topic=agent_uuid + '.s4t_invoke_wamp', + target = oslo_messaging.Target(topic=AGENT_HOST + '.s4t_invoke_wamp', server='server1') self.server = oslo_messaging.get_rpc_server(transport, @@ -184,16 +187,38 @@ class WampManager(object): class WampAgent(object): - def __init__(self): + def __init__(self, host): + logging.register_options(CONF) CONF(project='iotronic') logging.setup(CONF, "iotronic-wamp-agent") - agent_uuid = 'agent' - global AGENT_UUID - AGENT_UUID = agent_uuid + # to be removed asap + self.host = host + self.dbapi = dbapi.get_instance() - r = RPCServer(agent_uuid) + try: + wpa = self.dbapi.register_wampagent( + {'hostname': self.host}) + + except exception.ConductorAlreadyRegistered: + LOG.warn(_LW("A conductor with hostname %(hostname)s " + "was previously registered. Updating registration"), + {'hostname': self.host}) + + except exception.WampAgentAlreadyRegistered: + LOG.warn(_LW("A wampagent with hostname %(hostname)s " + "was previously registered. Updating registration"), + {'hostname': self.host}) + + wpa = self.dbapi.register_wampagent({'hostname': self.host}, + update_existing=True) + self.wampagent = wpa + + global AGENT_HOST + AGENT_HOST = self.host + + r = RPCServer() w = WampManager() try: diff --git a/utils/iotronic.sql b/utils/iotronic.sql index 888b5a5..a28f63f 100644 --- a/utils/iotronic.sql +++ b/utils/iotronic.sql @@ -35,6 +35,23 @@ ENGINE = InnoDB AUTO_INCREMENT = 6 DEFAULT CHARACTER SET = utf8; +-- ----------------------------------------------------- +-- Table `iotronic`.`wampagents` +-- ----------------------------------------------------- +DROP TABLE IF EXISTS `iotronic`.`wampagents` ; + +CREATE TABLE IF NOT EXISTS `iotronic`.`wampagents` ( + `created_at` DATETIME NULL DEFAULT NULL, + `updated_at` DATETIME NULL DEFAULT NULL, + `id` INT(11) NOT NULL AUTO_INCREMENT, + `hostname` VARCHAR(255) NOT NULL, + `online` TINYINT(1) NULL DEFAULT NULL, + PRIMARY KEY (`id`), + UNIQUE INDEX `uniq_wampagents0hostname` (`hostname` ASC)) +ENGINE = InnoDB +AUTO_INCREMENT = 6 +DEFAULT CHARACTER SET = utf8; + -- ----------------------------------------------------- -- Table `iotronic`.`nodes`