added wamp agent model on the db

Change-Id: I2d7fc4f80cff135fd58dd4642fcb268dff92b0f8
This commit is contained in:
Fabio Verboso 2017-02-07 14:23:27 +01:00
parent 8d762aa9fb
commit 45f7e53590
7 changed files with 154 additions and 22 deletions

View File

@ -23,4 +23,4 @@ from iotronic.wamp.agent import WampAgent
if __name__ == '__main__':
wa=WampAgent()
wa=WampAgent('agent')

View File

@ -29,7 +29,6 @@ import six
from iotronic.common.i18n import _
from iotronic.common.i18n import _LE
LOG = logging.getLogger(__name__)
exc_log_opts = [
@ -290,6 +289,14 @@ class ConductorAlreadyRegistered(IotronicException):
message = _("Conductor %(conductor)s already registered.")
class WampAgentNotFound(NotFound):
message = _("WampAgent %(wampagent)s could not be found.")
class WampAgentAlreadyRegistered(IotronicException):
message = _("WampAgent %(wampagent)s already registered.")
class PowerStateFailure(InvalidState):
message = _("Failed to set node power state to %(pstate)s.")

View File

@ -23,7 +23,6 @@ from oslo_config import cfg
from oslo_db import api as db_api
import six
_BACKEND_MAPPING = {'sqlalchemy': 'iotronic.db.sqlalchemy.api'}
IMPL = db_api.DBAPI.from_config(cfg.CONF, backend_mapping=_BACKEND_MAPPING,
lazy=True)
@ -241,3 +240,28 @@ class Connection(object):
(asc, desc)
:returns: A list of locations.
"""
@abc.abstractmethod
def get_wampagent(self, hostname):
"""Retrieve a wampagent's service record from the database.
:param hostname: The hostname of the wampagent service.
:returns: A wampagent.
:raises: WampAgentNotFound
"""
@abc.abstractmethod
def unregister_wampagent(self, hostname):
"""Remove this wampagent from the service registry immediately.
:param hostname: The hostname of this wampagent service.
:raises: WampAgentNotFound
"""
@abc.abstractmethod
def touch_wampagent(self, hostname):
"""Mark a wampagent as active by updating its 'updated_at' property.
:param hostname: The hostname of this wampagent service.
:raises: WampAgentNotFound
"""

View File

@ -39,7 +39,6 @@ CONF.import_opt('heartbeat_timeout',
LOG = log.getLogger(__name__)
_FACADE = None
@ -268,11 +267,11 @@ class Connection(api.Connection):
values['inspection_started_at'] = timeutils.utcnow()
values['inspection_finished_at'] = None
elif (ref.provision_state == states.INSPECTING and
values['provision_state'] == states.MANAGEABLE):
values['provision_state'] == states.MANAGEABLE):
values['inspection_finished_at'] = timeutils.utcnow()
values['inspection_started_at'] = None
elif (ref.provision_state == states.INSPECTING and
values['provision_state'] == states.INSPECTFAIL):
values['provision_state'] == states.INSPECTFAIL):
values['inspection_started_at'] = None
ref.update(values)
@ -397,3 +396,52 @@ class Connection(api.Connection):
return query.one()
except NoResultFound:
return None
def register_wampagent(self, values, update_existing=False):
session = get_session()
with session.begin():
query = (model_query(models.WampAgent, session=session)
.filter_by(hostname=values['hostname']))
try:
ref = query.one()
if ref.online is True and not update_existing:
raise exception.WampAgentAlreadyRegistered(
wampagent=values['hostname'])
except NoResultFound:
ref = models.WampAgent()
ref.update(values)
# always set online and updated_at fields when registering
# a wampagent, especially when updating an existing one
ref.update({'updated_at': timeutils.utcnow(),
'online': True})
ref.save(session)
return ref
def get_wampagent(self, hostname):
try:
return (model_query(models.WampAgent)
.filter_by(hostname=hostname, online=True)
.one())
except NoResultFound:
raise exception.WampAgentNotFound(wampagent=hostname)
def unregister_wampagent(self, hostname):
session = get_session()
with session.begin():
query = (model_query(models.WampAgent, session=session)
.filter_by(hostname=hostname, online=True))
count = query.update({'online': False})
if count == 0:
raise exception.WampAgentNotFound(wampagent=hostname)
def touch_wampagent(self, hostname):
session = get_session()
with session.begin():
query = (model_query(models.WampAgent, session=session)
.filter_by(hostname=hostname))
# since we're not changing any other field, manually set updated_at
# and since we're heartbeating, make sure that online=True
count = query.update({'updated_at': timeutils.utcnow(),
'online': True})
if count == 0:
raise exception.WampAgentNotFound(wampagent=hostname)

View File

@ -33,7 +33,6 @@ from sqlalchemy import String
from sqlalchemy.types import TypeDecorator, TEXT
from iotronic.common import paths
sql_opts = [
cfg.StrOpt('mysql_engine',
default='InnoDB',
@ -41,8 +40,7 @@ sql_opts = [
]
_DEFAULT_SQL_CONNECTION = 'sqlite:///' + \
paths.state_path_def('iotronic.sqlite')
paths.state_path_def('iotronic.sqlite')
cfg.CONF.register_opts(sql_opts, 'database')
db_options.set_defaults(cfg.CONF, _DEFAULT_SQL_CONNECTION, 'iotronic.sqlite')
@ -92,7 +90,6 @@ class JSONEncodedList(JsonEncodedType):
class IotronicBase(models.TimestampMixin,
models.ModelBase):
metadata = None
def as_dict(self):
@ -109,6 +106,7 @@ class IotronicBase(models.TimestampMixin,
super(IotronicBase, self).save(session)
Base = declarative_base(cls=IotronicBase)
@ -125,6 +123,19 @@ class Conductor(Base):
online = Column(Boolean, default=True)
class WampAgent(Base):
"""Represents a wampagent service entry."""
__tablename__ = 'wampagents'
__table_args__ = (
schema.UniqueConstraint('hostname', name='uniq_wampagentss0hostname'),
table_args()
)
id = Column(Integer, primary_key=True)
hostname = Column(String(255), nullable=False)
online = Column(Boolean, default=True)
class Node(Base):
"""Represents a Node."""

View File

@ -16,6 +16,9 @@
from autobahn.twisted import wamp
from autobahn.twisted import websocket
from autobahn.wamp import types
from iotronic.common import exception
from iotronic.common.i18n import _LW
from iotronic.db import api as dbapi
from oslo_config import cfg
from oslo_log import log as logging
import oslo_messaging
@ -24,7 +27,6 @@ from threading import Thread
from twisted.internet.protocol import ReconnectingClientFactory
from twisted.internet import reactor
LOG = logging.getLogger(__name__)
wamp_opts = [
@ -41,7 +43,7 @@ CONF.register_opts(wamp_opts, 'wamp')
shared_result = {}
wamp_session_caller = None
AGENT_UUID = None
AGENT_HOST = None
def wamp_request(e, kwarg, session):
@ -92,7 +94,7 @@ class WampEndpoint(object):
class WampFrontend(wamp.ApplicationSession):
def onJoin(self, details):
global wamp_session_caller, AGENT_UUID
global wamp_session_caller, AGENT_HOST
wamp_session_caller = self
import iotronic.wamp.registerd_functions as fun
@ -101,7 +103,7 @@ class WampFrontend(wamp.ApplicationSession):
try:
self.register(fun.registration, u'stack4things.register')
self.register(fun.echo, AGENT_UUID + u'.stack4things.echo')
self.register(fun.echo, AGENT_HOST + u'.stack4things.echo')
LOG.info("procedure registered")
except Exception as e:
LOG.error("could not register procedure: {0}".format(e))
@ -130,16 +132,17 @@ class WampClientFactory(websocket.WampWebSocketClientFactory,
class RPCServer(Thread):
def __init__(self, agent_uuid):
def __init__(self):
global AGENT_HOST
# AMQP CONFIG
endpoints = [
WampEndpoint(WampFrontend, agent_uuid),
WampEndpoint(WampFrontend, AGENT_HOST),
]
Thread.__init__(self)
transport = oslo_messaging.get_transport(CONF)
target = oslo_messaging.Target(topic=agent_uuid + '.s4t_invoke_wamp',
target = oslo_messaging.Target(topic=AGENT_HOST + '.s4t_invoke_wamp',
server='server1')
self.server = oslo_messaging.get_rpc_server(transport,
@ -184,16 +187,38 @@ class WampManager(object):
class WampAgent(object):
def __init__(self):
def __init__(self, host):
logging.register_options(CONF)
CONF(project='iotronic')
logging.setup(CONF, "iotronic-wamp-agent")
agent_uuid = 'agent'
global AGENT_UUID
AGENT_UUID = agent_uuid
# to be removed asap
self.host = host
self.dbapi = dbapi.get_instance()
r = RPCServer(agent_uuid)
try:
wpa = self.dbapi.register_wampagent(
{'hostname': self.host})
except exception.ConductorAlreadyRegistered:
LOG.warn(_LW("A conductor with hostname %(hostname)s "
"was previously registered. Updating registration"),
{'hostname': self.host})
except exception.WampAgentAlreadyRegistered:
LOG.warn(_LW("A wampagent with hostname %(hostname)s "
"was previously registered. Updating registration"),
{'hostname': self.host})
wpa = self.dbapi.register_wampagent({'hostname': self.host},
update_existing=True)
self.wampagent = wpa
global AGENT_HOST
AGENT_HOST = self.host
r = RPCServer()
w = WampManager()
try:

View File

@ -35,6 +35,23 @@ ENGINE = InnoDB
AUTO_INCREMENT = 6
DEFAULT CHARACTER SET = utf8;
-- -----------------------------------------------------
-- Table `iotronic`.`wampagents`
-- -----------------------------------------------------
DROP TABLE IF EXISTS `iotronic`.`wampagents` ;
CREATE TABLE IF NOT EXISTS `iotronic`.`wampagents` (
`created_at` DATETIME NULL DEFAULT NULL,
`updated_at` DATETIME NULL DEFAULT NULL,
`id` INT(11) NOT NULL AUTO_INCREMENT,
`hostname` VARCHAR(255) NOT NULL,
`online` TINYINT(1) NULL DEFAULT NULL,
PRIMARY KEY (`id`),
UNIQUE INDEX `uniq_wampagents0hostname` (`hostname` ASC))
ENGINE = InnoDB
AUTO_INCREMENT = 6
DEFAULT CHARACTER SET = utf8;
-- -----------------------------------------------------
-- Table `iotronic`.`nodes`