added wamp agent model on the db
Change-Id: I2d7fc4f80cff135fd58dd4642fcb268dff92b0f8
This commit is contained in:
parent
8d762aa9fb
commit
45f7e53590
|
@ -23,4 +23,4 @@ from iotronic.wamp.agent import WampAgent
|
|||
|
||||
if __name__ == '__main__':
|
||||
|
||||
wa=WampAgent()
|
||||
wa=WampAgent('agent')
|
||||
|
|
|
@ -29,7 +29,6 @@ import six
|
|||
from iotronic.common.i18n import _
|
||||
from iotronic.common.i18n import _LE
|
||||
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
exc_log_opts = [
|
||||
|
@ -290,6 +289,14 @@ class ConductorAlreadyRegistered(IotronicException):
|
|||
message = _("Conductor %(conductor)s already registered.")
|
||||
|
||||
|
||||
class WampAgentNotFound(NotFound):
|
||||
message = _("WampAgent %(wampagent)s could not be found.")
|
||||
|
||||
|
||||
class WampAgentAlreadyRegistered(IotronicException):
|
||||
message = _("WampAgent %(wampagent)s already registered.")
|
||||
|
||||
|
||||
class PowerStateFailure(InvalidState):
|
||||
message = _("Failed to set node power state to %(pstate)s.")
|
||||
|
||||
|
|
|
@ -23,7 +23,6 @@ from oslo_config import cfg
|
|||
from oslo_db import api as db_api
|
||||
import six
|
||||
|
||||
|
||||
_BACKEND_MAPPING = {'sqlalchemy': 'iotronic.db.sqlalchemy.api'}
|
||||
IMPL = db_api.DBAPI.from_config(cfg.CONF, backend_mapping=_BACKEND_MAPPING,
|
||||
lazy=True)
|
||||
|
@ -241,3 +240,28 @@ class Connection(object):
|
|||
(asc, desc)
|
||||
:returns: A list of locations.
|
||||
"""
|
||||
|
||||
@abc.abstractmethod
|
||||
def get_wampagent(self, hostname):
|
||||
"""Retrieve a wampagent's service record from the database.
|
||||
|
||||
:param hostname: The hostname of the wampagent service.
|
||||
:returns: A wampagent.
|
||||
:raises: WampAgentNotFound
|
||||
"""
|
||||
|
||||
@abc.abstractmethod
|
||||
def unregister_wampagent(self, hostname):
|
||||
"""Remove this wampagent from the service registry immediately.
|
||||
|
||||
:param hostname: The hostname of this wampagent service.
|
||||
:raises: WampAgentNotFound
|
||||
"""
|
||||
|
||||
@abc.abstractmethod
|
||||
def touch_wampagent(self, hostname):
|
||||
"""Mark a wampagent as active by updating its 'updated_at' property.
|
||||
|
||||
:param hostname: The hostname of this wampagent service.
|
||||
:raises: WampAgentNotFound
|
||||
"""
|
||||
|
|
|
@ -39,7 +39,6 @@ CONF.import_opt('heartbeat_timeout',
|
|||
|
||||
LOG = log.getLogger(__name__)
|
||||
|
||||
|
||||
_FACADE = None
|
||||
|
||||
|
||||
|
@ -268,11 +267,11 @@ class Connection(api.Connection):
|
|||
values['inspection_started_at'] = timeutils.utcnow()
|
||||
values['inspection_finished_at'] = None
|
||||
elif (ref.provision_state == states.INSPECTING and
|
||||
values['provision_state'] == states.MANAGEABLE):
|
||||
values['provision_state'] == states.MANAGEABLE):
|
||||
values['inspection_finished_at'] = timeutils.utcnow()
|
||||
values['inspection_started_at'] = None
|
||||
elif (ref.provision_state == states.INSPECTING and
|
||||
values['provision_state'] == states.INSPECTFAIL):
|
||||
values['provision_state'] == states.INSPECTFAIL):
|
||||
values['inspection_started_at'] = None
|
||||
|
||||
ref.update(values)
|
||||
|
@ -397,3 +396,52 @@ class Connection(api.Connection):
|
|||
return query.one()
|
||||
except NoResultFound:
|
||||
return None
|
||||
|
||||
def register_wampagent(self, values, update_existing=False):
|
||||
session = get_session()
|
||||
with session.begin():
|
||||
query = (model_query(models.WampAgent, session=session)
|
||||
.filter_by(hostname=values['hostname']))
|
||||
try:
|
||||
ref = query.one()
|
||||
if ref.online is True and not update_existing:
|
||||
raise exception.WampAgentAlreadyRegistered(
|
||||
wampagent=values['hostname'])
|
||||
except NoResultFound:
|
||||
ref = models.WampAgent()
|
||||
ref.update(values)
|
||||
# always set online and updated_at fields when registering
|
||||
# a wampagent, especially when updating an existing one
|
||||
ref.update({'updated_at': timeutils.utcnow(),
|
||||
'online': True})
|
||||
ref.save(session)
|
||||
return ref
|
||||
|
||||
def get_wampagent(self, hostname):
|
||||
try:
|
||||
return (model_query(models.WampAgent)
|
||||
.filter_by(hostname=hostname, online=True)
|
||||
.one())
|
||||
except NoResultFound:
|
||||
raise exception.WampAgentNotFound(wampagent=hostname)
|
||||
|
||||
def unregister_wampagent(self, hostname):
|
||||
session = get_session()
|
||||
with session.begin():
|
||||
query = (model_query(models.WampAgent, session=session)
|
||||
.filter_by(hostname=hostname, online=True))
|
||||
count = query.update({'online': False})
|
||||
if count == 0:
|
||||
raise exception.WampAgentNotFound(wampagent=hostname)
|
||||
|
||||
def touch_wampagent(self, hostname):
|
||||
session = get_session()
|
||||
with session.begin():
|
||||
query = (model_query(models.WampAgent, session=session)
|
||||
.filter_by(hostname=hostname))
|
||||
# since we're not changing any other field, manually set updated_at
|
||||
# and since we're heartbeating, make sure that online=True
|
||||
count = query.update({'updated_at': timeutils.utcnow(),
|
||||
'online': True})
|
||||
if count == 0:
|
||||
raise exception.WampAgentNotFound(wampagent=hostname)
|
||||
|
|
|
@ -33,7 +33,6 @@ from sqlalchemy import String
|
|||
from sqlalchemy.types import TypeDecorator, TEXT
|
||||
from iotronic.common import paths
|
||||
|
||||
|
||||
sql_opts = [
|
||||
cfg.StrOpt('mysql_engine',
|
||||
default='InnoDB',
|
||||
|
@ -41,8 +40,7 @@ sql_opts = [
|
|||
]
|
||||
|
||||
_DEFAULT_SQL_CONNECTION = 'sqlite:///' + \
|
||||
paths.state_path_def('iotronic.sqlite')
|
||||
|
||||
paths.state_path_def('iotronic.sqlite')
|
||||
|
||||
cfg.CONF.register_opts(sql_opts, 'database')
|
||||
db_options.set_defaults(cfg.CONF, _DEFAULT_SQL_CONNECTION, 'iotronic.sqlite')
|
||||
|
@ -92,7 +90,6 @@ class JSONEncodedList(JsonEncodedType):
|
|||
|
||||
class IotronicBase(models.TimestampMixin,
|
||||
models.ModelBase):
|
||||
|
||||
metadata = None
|
||||
|
||||
def as_dict(self):
|
||||
|
@ -109,6 +106,7 @@ class IotronicBase(models.TimestampMixin,
|
|||
|
||||
super(IotronicBase, self).save(session)
|
||||
|
||||
|
||||
Base = declarative_base(cls=IotronicBase)
|
||||
|
||||
|
||||
|
@ -125,6 +123,19 @@ class Conductor(Base):
|
|||
online = Column(Boolean, default=True)
|
||||
|
||||
|
||||
class WampAgent(Base):
|
||||
"""Represents a wampagent service entry."""
|
||||
|
||||
__tablename__ = 'wampagents'
|
||||
__table_args__ = (
|
||||
schema.UniqueConstraint('hostname', name='uniq_wampagentss0hostname'),
|
||||
table_args()
|
||||
)
|
||||
id = Column(Integer, primary_key=True)
|
||||
hostname = Column(String(255), nullable=False)
|
||||
online = Column(Boolean, default=True)
|
||||
|
||||
|
||||
class Node(Base):
|
||||
"""Represents a Node."""
|
||||
|
||||
|
|
|
@ -16,6 +16,9 @@
|
|||
from autobahn.twisted import wamp
|
||||
from autobahn.twisted import websocket
|
||||
from autobahn.wamp import types
|
||||
from iotronic.common import exception
|
||||
from iotronic.common.i18n import _LW
|
||||
from iotronic.db import api as dbapi
|
||||
from oslo_config import cfg
|
||||
from oslo_log import log as logging
|
||||
import oslo_messaging
|
||||
|
@ -24,7 +27,6 @@ from threading import Thread
|
|||
from twisted.internet.protocol import ReconnectingClientFactory
|
||||
from twisted.internet import reactor
|
||||
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
wamp_opts = [
|
||||
|
@ -41,7 +43,7 @@ CONF.register_opts(wamp_opts, 'wamp')
|
|||
|
||||
shared_result = {}
|
||||
wamp_session_caller = None
|
||||
AGENT_UUID = None
|
||||
AGENT_HOST = None
|
||||
|
||||
|
||||
def wamp_request(e, kwarg, session):
|
||||
|
@ -92,7 +94,7 @@ class WampEndpoint(object):
|
|||
|
||||
class WampFrontend(wamp.ApplicationSession):
|
||||
def onJoin(self, details):
|
||||
global wamp_session_caller, AGENT_UUID
|
||||
global wamp_session_caller, AGENT_HOST
|
||||
wamp_session_caller = self
|
||||
import iotronic.wamp.registerd_functions as fun
|
||||
|
||||
|
@ -101,7 +103,7 @@ class WampFrontend(wamp.ApplicationSession):
|
|||
|
||||
try:
|
||||
self.register(fun.registration, u'stack4things.register')
|
||||
self.register(fun.echo, AGENT_UUID + u'.stack4things.echo')
|
||||
self.register(fun.echo, AGENT_HOST + u'.stack4things.echo')
|
||||
LOG.info("procedure registered")
|
||||
except Exception as e:
|
||||
LOG.error("could not register procedure: {0}".format(e))
|
||||
|
@ -130,16 +132,17 @@ class WampClientFactory(websocket.WampWebSocketClientFactory,
|
|||
|
||||
|
||||
class RPCServer(Thread):
|
||||
def __init__(self, agent_uuid):
|
||||
def __init__(self):
|
||||
global AGENT_HOST
|
||||
|
||||
# AMQP CONFIG
|
||||
endpoints = [
|
||||
WampEndpoint(WampFrontend, agent_uuid),
|
||||
WampEndpoint(WampFrontend, AGENT_HOST),
|
||||
]
|
||||
|
||||
Thread.__init__(self)
|
||||
transport = oslo_messaging.get_transport(CONF)
|
||||
target = oslo_messaging.Target(topic=agent_uuid + '.s4t_invoke_wamp',
|
||||
target = oslo_messaging.Target(topic=AGENT_HOST + '.s4t_invoke_wamp',
|
||||
server='server1')
|
||||
|
||||
self.server = oslo_messaging.get_rpc_server(transport,
|
||||
|
@ -184,16 +187,38 @@ class WampManager(object):
|
|||
|
||||
|
||||
class WampAgent(object):
|
||||
def __init__(self):
|
||||
def __init__(self, host):
|
||||
|
||||
logging.register_options(CONF)
|
||||
CONF(project='iotronic')
|
||||
logging.setup(CONF, "iotronic-wamp-agent")
|
||||
|
||||
agent_uuid = 'agent'
|
||||
global AGENT_UUID
|
||||
AGENT_UUID = agent_uuid
|
||||
# to be removed asap
|
||||
self.host = host
|
||||
self.dbapi = dbapi.get_instance()
|
||||
|
||||
r = RPCServer(agent_uuid)
|
||||
try:
|
||||
wpa = self.dbapi.register_wampagent(
|
||||
{'hostname': self.host})
|
||||
|
||||
except exception.ConductorAlreadyRegistered:
|
||||
LOG.warn(_LW("A conductor with hostname %(hostname)s "
|
||||
"was previously registered. Updating registration"),
|
||||
{'hostname': self.host})
|
||||
|
||||
except exception.WampAgentAlreadyRegistered:
|
||||
LOG.warn(_LW("A wampagent with hostname %(hostname)s "
|
||||
"was previously registered. Updating registration"),
|
||||
{'hostname': self.host})
|
||||
|
||||
wpa = self.dbapi.register_wampagent({'hostname': self.host},
|
||||
update_existing=True)
|
||||
self.wampagent = wpa
|
||||
|
||||
global AGENT_HOST
|
||||
AGENT_HOST = self.host
|
||||
|
||||
r = RPCServer()
|
||||
w = WampManager()
|
||||
|
||||
try:
|
||||
|
|
|
@ -35,6 +35,23 @@ ENGINE = InnoDB
|
|||
AUTO_INCREMENT = 6
|
||||
DEFAULT CHARACTER SET = utf8;
|
||||
|
||||
-- -----------------------------------------------------
|
||||
-- Table `iotronic`.`wampagents`
|
||||
-- -----------------------------------------------------
|
||||
DROP TABLE IF EXISTS `iotronic`.`wampagents` ;
|
||||
|
||||
CREATE TABLE IF NOT EXISTS `iotronic`.`wampagents` (
|
||||
`created_at` DATETIME NULL DEFAULT NULL,
|
||||
`updated_at` DATETIME NULL DEFAULT NULL,
|
||||
`id` INT(11) NOT NULL AUTO_INCREMENT,
|
||||
`hostname` VARCHAR(255) NOT NULL,
|
||||
`online` TINYINT(1) NULL DEFAULT NULL,
|
||||
PRIMARY KEY (`id`),
|
||||
UNIQUE INDEX `uniq_wampagents0hostname` (`hostname` ASC))
|
||||
ENGINE = InnoDB
|
||||
AUTO_INCREMENT = 6
|
||||
DEFAULT CHARACTER SET = utf8;
|
||||
|
||||
|
||||
-- -----------------------------------------------------
|
||||
-- Table `iotronic`.`nodes`
|
||||
|
|
Loading…
Reference in New Issue