Cyborg Conductor Stubs

This is a set of stubs for the Cyborg conductor with working
rpc and message handling although nothing really
to do with those messages until the DB component is formalized
and created in the next patch.

Change-Id: I0df68a165881697c7b199189a89471baf9ece2af
This commit is contained in:
jkilpatr 2017-06-09 08:39:17 -04:00
parent dad0795e7b
commit 0cbb31f02c
6 changed files with 187 additions and 0 deletions

View File

View File

@ -0,0 +1,3 @@
[DEFAULT]
transport_url=
server_id=

View File

@ -0,0 +1,74 @@
# -*- coding: utf-8 -*-
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import conf
import eventlet
import handlers
from oslo_config import cfg
from oslo_log import log as logging
import oslo_messaging as messaging
import rpcapi
import time
eventlet.monkey_patch()
CONF = cfg.CONF
conf.register_opts(CONF)
LOG = logging.getLogger(__name__)
logging.register_options(CONF)
logging.setup(CONF, 'Cyborg.Conductor')
url = messaging.TransportURL.parse(CONF, url=CONF.transport_url)
transport = messaging.get_notification_transport(CONF, url)
message_endpoints = [
handlers.NotificationEndpoint
]
message_targets = [
messaging.Target(topic='info'),
messaging.Target(topic='update'),
messaging.Target(topic='warn'),
messaging.Target(topic='error')
]
rpc_targets = messaging.Target(topic='cyborg_control', server=CONF.server_id)
rpc_endpoints = [
rpcapi.RPCEndpoint()
]
access_policy = messaging.ExplicitRPCAccessPolicy
rpc_server = messaging.get_rpc_server(transport,
rpc_targets,
rpc_endpoints,
executor='eventlet',
access_policy=access_policy)
pool = "listener-workers"
message_server = messaging.get_notification_listener(transport,
message_targets,
message_endpoints,
executor='eventlet',
allow_requeue=True)
try:
message_server.start()
rpc_server.start()
print("Cyborg Conductor running")
while True:
time.sleep(1)
except KeyboardInterrupt:
print("Stopping server")
message_server.stop()
rpc_server.stop()
message_server.wait()
rpc_server.wait()

28
cyborg/conductor/conf.py Normal file
View File

@ -0,0 +1,28 @@
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from oslo_config import cfg
import uuid
default_opts = [
cfg.StrOpt('transport_url',
default='',
help='Transport url for messating'),
cfg.StrOpt('server_id',
default=uuid.uuid4(),
help='Unique ID for this conductor instance'),
]
def register_opts(conf):
conf.register_opts(default_opts)

View File

@ -0,0 +1,40 @@
# -*- coding: utf-8 -*-
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
class NotificationEndpoint(object):
# filter_rule = messaging.NotificationFilter(publisher_id='^cyborg.*')
# We have an update from an agent and we need to add it to our in memory
# cache of accelerator objects and schedule a flush to the database
def update(self, ctxt, publisher_id, event_type, payload, metadata):
print("Got update")
return True
# We have an info message from an agent, anything that wouldn't
# go into the db but needs to be communicated goes here
def info(self, ctxt, publisher_id, event_type, payload, metadata):
print("Got info")
return True
# We have a warning from an agent, we may take some action
def warn(self, ctxt, publisher_id, event_type, payload, metadata):
print("Got warn")
return True
# We have an error from an agent, we must take some action
def error(self, ctxt, publisher_id, event_type, payload, metadata):
print("Got error")
return True

View File

@ -0,0 +1,42 @@
# -*- coding: utf-8 -*-
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
class RPCEndpoint(object):
# Conductor functions exposed for external calls
# Mostly called by the API?
def __init__(self):
pass
# Returns a list of all accelerators managed by Cyborg
def list_accelerators(self, ctxt):
pass
# Returns an accelerator from the DB
def get_accelerator(self, ctxt, accelerator):
pass
# Deletes an accelerator from the DB and from the agent that hosts it
def delete_accelerator(self, ctxt, accelerator):
pass
# Updates an accelerator both in the DB and on the agent that hosts it
def update_accelerator(self, ctxt, accelerator):
pass
# Runs discovery on either a single agent or all agents
def discover_accelerators(self, ctxt, agent_id=None):
pass