Add "Report device data to cyborg"

1. Load drivers with stevedore
2. Now update_usage just do the discover, add report data when
   conductor api ready.

Change-Id: Ia813c5a8dd8f29ce689204e52b6e1f691633f5fc
This commit is contained in:
wangzh21 2018-08-27 19:48:03 +08:00
parent 34eac481a4
commit ca225fc1bf
6 changed files with 91 additions and 220 deletions

View File

@ -18,31 +18,20 @@ Track resources like FPGA GPU and QAT for a host. Provides the
conductor with useful information about availability through the accelerator conductor with useful information about availability through the accelerator
model. model.
""" """
from oslo_log import log as logging
from oslo_messaging.rpc.client import RemoteError
from oslo_utils import uuidutils
from cyborg.accelerator.drivers.fpga.base import FPGADriver from oslo_log import log as logging
from stevedore import driver
from stevedore.extension import ExtensionManager
from cyborg.common import exception
from cyborg.common import utils from cyborg.common import utils
from cyborg import objects from cyborg.conf import CONF
LOG = logging.getLogger(__name__) LOG = logging.getLogger(__name__)
AGENT_RESOURCE_SEMAPHORE = "agent_resources" AGENT_RESOURCE_SEMAPHORE = "agent_resources"
DEPLOYABLE_VERSION = "1.0"
# need to change the driver field name
DEPLOYABLE_HOST_MAPS = {"assignable": "assignable",
"address": "devices",
"board": "product_id",
"type": "function",
"vendor": "vendor_id",
"name": "name",
"interface_type": "interface_type"
}
class ResourceTracker(object): class ResourceTracker(object):
"""Agent helper class for keeping track of resource usage as instances """Agent helper class for keeping track of resource usage as instances
@ -50,149 +39,35 @@ class ResourceTracker(object):
""" """
def __init__(self, host, cond_api): def __init__(self, host, cond_api):
# FIXME (Shaohe) local cache for Accelerator.
# Will fix it in next release.
self.fpgas = None
self.host = host self.host = host
self.conductor_api = cond_api self.conductor_api = cond_api
self.fpga_driver = FPGADriver() self.acc_drivers = []
self._initialize_drivers()
@utils.synchronized(AGENT_RESOURCE_SEMAPHORE) def _initialize_drivers(self, enabled_drivers=[]):
def claim(self, context):
pass
def _fpga_compare_and_update(self, host_dev, acclerator):
need_updated = False
for k, v in DEPLOYABLE_HOST_MAPS.items():
if acclerator[k] != host_dev[v]:
need_updated = True
acclerator[k] = host_dev[v]
return need_updated
def _gen_accelerator_for_deployable(
self, context, name, vendor, productor, desc="", dev_type="pf",
acc_type="FPGA", acc_cap="", remotable=0):
""" """
The type of the accelerator device, e.g GPU, FPGA, ... Load accelerator drivers.
acc_type defines the usage of the accelerator, e.g Crypto :return: [nvidia_gpu_driver_obj, intel_fpga_driver_obj]
acc_capability defines the specific capability, e.g AES
""" """
db_acc = { acc_drivers = []
'deleted': False, if not enabled_drivers:
'uuid': uuidutils.generate_uuid(), enabled_drivers = CONF.agent.enabled_drivers
'name': name, valid_drivers = ExtensionManager(
'description': desc, namespace='cyborg.accelerator.driver').names()
'project_id': context.project_id, for d in enabled_drivers:
'user_id': context.user_id, if d not in valid_drivers:
'device_type': dev_type, raise exception.InvalidDriver(name=d)
'acc_type': acc_type, acc_driver = driver.DriverManager(
'acc_capability': acc_cap, namespace='cyborg.accelerator.driver', name=d,
'vendor_id': vendor, invoke_on_load=True).driver
'product_id': productor, acc_drivers.append(acc_driver)
'remotable': remotable self.acc_drivers = acc_drivers
}
acc = objects.Accelerator(context, **db_acc)
acc = self.conductor_api.accelerator_create(context, acc)
return acc
def _gen_deployable_from_host_dev(self, host_dev, acc_id,
parent_uuid=None, root_uuid=None):
dep = {}
for k, v in DEPLOYABLE_HOST_MAPS.items():
dep[k] = host_dev[v]
dep["host"] = self.host
dep["version"] = DEPLOYABLE_VERSION
dep["availability"] = "free"
dep["uuid"] = uuidutils.generate_uuid()
dep["parent_uuid"] = parent_uuid
dep["root_uuid"] = root_uuid
dep["accelerator_id"] = acc_id
return dep
@utils.synchronized(AGENT_RESOURCE_SEMAPHORE) @utils.synchronized(AGENT_RESOURCE_SEMAPHORE)
def update_usage(self, context): def update_usage(self, context):
"""Update the resource usage and stats after a change in an """Update the resource usage periodically.
instance
""" """
def create_deployable(fpgas, bdf, acc_id, parent_uuid=None): acc_list = []
fpga = fpgas[bdf] for acc_driver in self.acc_drivers:
dep = self._gen_deployable_from_host_dev(fpga, acc_id) acc_list.extend(acc_driver.discover())
# if parent_uuid: # Call conductor_api here to diff and report acc data.
dep["parent_uuid"] = parent_uuid
obj_dep = objects.Deployable(context, **dep)
new_dep = self.conductor_api.deployable_create(context, obj_dep)
return new_dep
# NOTE(Shaohe Feng) need more agreement on how to keep consistency.
fpgas = self._get_fpga_devices()
bdfs = set(fpgas.keys())
deployables = self.conductor_api.deployable_get_by_host(
context, self.host)
# NOTE(Shaohe Feng) when no "address" in deployable?
accls = dict([(v["address"], v) for v in deployables])
accl_bdfs = set(accls.keys())
# Firstly update
for mutual in accl_bdfs & bdfs:
accl = accls[mutual]
if self._fpga_compare_and_update(fpgas[mutual], accl):
try:
self.conductor_api.deployable_update(context, accl)
except RemoteError as e:
LOG.error(e)
# Add
new = bdfs - accl_bdfs
new_pf = set([n for n in new if fpgas[n]["function"] == "pf"])
for n in new_pf:
fpga = fpgas[n]
acc = self._gen_accelerator_for_deployable(
context, fpga["name"], fpga["vendor_id"], fpga["product_id"],
"FPGA device on %s" % self.host, "pf", "FPGA")
new_dep = create_deployable(fpgas, n, acc.id)
accls[n] = new_dep
sub_vf = set()
if "regions" in n:
sub_vf = set([sub["devices"] for sub in fpgas[n]["regions"]])
for vf in sub_vf & new:
fpga = fpgas[n]
acc = self._gen_accelerator_for_deployable(
context, fpga["name"], fpga["vendor_id"],
fpga["product_id"], "FPGA device on %s" % self.host,
"vf", "FPGA")
new_dep = create_deployable(fpgas, vf, acc.id, new_dep["uuid"])
accls[vf] = new_dep
new.remove(vf)
for n in new - new_pf:
p_bdf = fpgas[n]["parent_devices"]
p_accl = accls[p_bdf]
p_uuid = p_accl["uuid"]
fpga = fpgas[n]
acc = self._gen_accelerator_for_deployable(
context, fpga["name"], fpga["vendor_id"], fpga["product_id"],
"FPGA device on %s" % self.host, "pf", "FPGA")
new_dep = create_deployable(fpgas, n, acc.id, p_uuid)
# Delete
for obsolete in accl_bdfs - bdfs:
try:
self.conductor_api.deployable_delete(context, accls[obsolete])
except RemoteError as e:
LOG.error(e)
del accls[obsolete]
def _get_fpga_devices(self):
def form_dict(devices, fpgas):
for v in devices:
fpgas[v["devices"]] = v
if "regions" in v:
form_dict(v["regions"], fpgas)
fpgas = {}
vendors = self.fpga_driver.discover_vendors()
for v in vendors:
driver = self.fpga_driver.create(v)
form_dict(driver.discover(), fpgas)
return fpgas

View File

@ -327,3 +327,7 @@ class ImageNotFound(NotFound):
class ImageBadRequest(Invalid): class ImageBadRequest(Invalid):
msg_fmt = _("Request of image %(image_id)s got BadRequest response: " msg_fmt = _("Request of image %(image_id)s got BadRequest response: "
"%(response)s") "%(response)s")
class InvalidDriver(Invalid):
_msg_fmt = _("Found an invalid driver: %(name)s")

View File

@ -15,6 +15,7 @@
from oslo_config import cfg from oslo_config import cfg
from cyborg.conf import agent
from cyborg.conf import api from cyborg.conf import api
from cyborg.conf import database from cyborg.conf import database
from cyborg.conf import default from cyborg.conf import default
@ -25,6 +26,7 @@ from cyborg.conf import keystone
CONF = cfg.CONF CONF = cfg.CONF
api.register_opts(CONF) api.register_opts(CONF)
agent.register_opts(CONF)
database.register_opts(CONF) database.register_opts(CONF)
default.register_opts(CONF) default.register_opts(CONF)
default.register_placement_opts(CONF) default.register_placement_opts(CONF)

42
cyborg/conf/agent.py Normal file
View File

@ -0,0 +1,42 @@
# Copyright 2018 Beijing Lenovo Software Ltd.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from oslo_config import cfg
from cyborg.common.i18n import _
opts = [
cfg.ListOpt('enabled_drivers',
default=[],
help=_('The accelerator drivers enabled on this agent. Such '
'as intel_fpga_driver, nvidia_gpu_driver, etc.')),
]
opt_group = cfg.OptGroup(name='agent',
title='Options for the cyborg-agent service')
AGENT_OPTS = (opts)
def register_opts(conf):
conf.register_group(opt_group)
conf.register_opts(opts, group=opt_group)
def list_opts():
return {
opt_group: AGENT_OPTS
}

View File

@ -15,18 +15,11 @@
"""Cyborg agent resource_tracker test cases.""" """Cyborg agent resource_tracker test cases."""
import os
import fixtures
from cyborg.accelerator.drivers.fpga import utils
from cyborg.accelerator.drivers.fpga.intel import sysinfo
from cyborg.agent.resource_tracker import ResourceTracker from cyborg.agent.resource_tracker import ResourceTracker
from cyborg.common import exception
from cyborg.conductor import rpcapi as cond_api from cyborg.conductor import rpcapi as cond_api
from cyborg.conf import CONF from cyborg.conf import CONF
from cyborg.tests import base from cyborg.tests import base
from cyborg.tests.unit.accelerator.drivers.fpga.intel import prepare_test_data
class TestResourceTracker(base.TestCase): class TestResourceTracker(base.TestCase):
@ -34,22 +27,10 @@ class TestResourceTracker(base.TestCase):
def setUp(self): def setUp(self):
super(TestResourceTracker, self).setUp() super(TestResourceTracker, self).setUp()
self.syspath = sysinfo.SYS_FPGA
sysinfo.SYS_FPGA = "/sys/class/fpga"
tmp_sys_dir = self.useFixture(fixtures.TempDir())
prepare_test_data.create_fake_sysfs(tmp_sys_dir.path)
sysinfo.SYS_FPGA = os.path.join(
tmp_sys_dir.path, sysinfo.SYS_FPGA.split("/", 1)[-1])
utils.SYS_FPGA_PATH = sysinfo.SYS_FPGA
self.host = CONF.host self.host = CONF.host
self.cond_api = cond_api.ConductorAPI() self.cond_api = cond_api.ConductorAPI()
self.rt = ResourceTracker(self.host, self.cond_api) self.rt = ResourceTracker(self.host, self.cond_api)
def tearDown(self):
super(TestResourceTracker, self).tearDown()
sysinfo.SYS_FPGA = self.syspath
utils.SYS_FPGA_PATH = self.syspath
def test_update_usage(self): def test_update_usage(self):
"""Update the resource usage and stats after a change in an """Update the resource usage and stats after a change in an
instance instance
@ -58,50 +39,13 @@ class TestResourceTracker(base.TestCase):
# has stored into DB by conductor correctly? # has stored into DB by conductor correctly?
pass pass
def test_get_fpga_devices(self): def test_initialize_acc_drivers(self):
expect = { enabled_drivers = ['intel_fpga_driver']
'0000:5e:00.0': { self.rt._initialize_drivers(enabled_drivers=enabled_drivers)
'function': 'pf', drivers = self.rt.acc_drivers
'assignable': False, self.assertEqual(len(drivers), len(enabled_drivers))
'pr_num': '1',
'name': 'intel-fpga-dev.0',
'interface_type': 'pci',
'vendor_id': '0x8086',
'devices': '0000:5e:00.0',
'regions': [{
'function': 'vf',
'assignable': True,
'name': 'intel-fpga-dev.2',
'interface_type': 'pci',
'vendor_id': '0x8086',
'devices': '0000:5e:00.1',
'parent_devices': '0000:5e:00.0',
'path': '%s/intel-fpga-dev.2' % sysinfo.SYS_FPGA,
'product_id': '0xbcc1'}],
'parent_devices': '',
'path': '%s/intel-fpga-dev.0' % sysinfo.SYS_FPGA,
'product_id': '0xbcc0'},
'0000:5e:00.1': {
'function': 'vf',
'assignable': True,
'name': 'intel-fpga-dev.2',
'interface_type': 'pci',
'vendor_id': '0x8086',
'devices': '0000:5e:00.1',
'parent_devices': '0000:5e:00.0',
'path': '%s/intel-fpga-dev.2' % sysinfo.SYS_FPGA,
'product_id': '0xbcc1'},
'0000:be:00.0': {
'function': 'pf',
'assignable': True,
'pr_num': '0',
'name': 'intel-fpga-dev.1',
'interface_type': 'pci',
'vendor_id': '0x8086',
'devices': '0000:be:00.0',
'parent_devices': '',
'path': '%s/intel-fpga-dev.1' % sysinfo.SYS_FPGA,
'product_id': '0xbcc0'}}
fpgas = self.rt._get_fpga_devices() def test_initialize_invalid_driver(self):
self.assertDictEqual(expect, fpgas) enabled_drivers = ['invalid_driver']
self.assertRaises(exception.InvalidDriver, self.rt._initialize_drivers,
enabled_drivers)

View File

@ -45,6 +45,10 @@ wsgi_scripts =
cyborg.database.migration_backend = cyborg.database.migration_backend =
sqlalchemy = cyborg.db.sqlalchemy.migration sqlalchemy = cyborg.db.sqlalchemy.migration
cyborg.accelerator.driver =
intel_fpga_driver = cyborg.accelerator.drivers.fpga.intel.driver:IntelFPGADriver
nvmf_spdk_driver = cyborg.accelerator.drivers.spdk.nvmf.nvmf:NVMFDRIVER
oslo.config.opts = oslo.config.opts =
cyborg = cyborg.conf.opts:list_opts cyborg = cyborg.conf.opts:list_opts