Add "Report device data to cyborg"

1. Load drivers with stevedore
2. Now update_usage just do the discover, add report data when
   conductor api ready.

Change-Id: Ia813c5a8dd8f29ce689204e52b6e1f691633f5fc
This commit is contained in:
wangzh21 2018-08-27 19:48:03 +08:00
parent 34eac481a4
commit ca225fc1bf
6 changed files with 91 additions and 220 deletions

View File

@ -18,31 +18,20 @@ Track resources like FPGA GPU and QAT for a host. Provides the
conductor with useful information about availability through the accelerator
model.
"""
from oslo_log import log as logging
from oslo_messaging.rpc.client import RemoteError
from oslo_utils import uuidutils
from cyborg.accelerator.drivers.fpga.base import FPGADriver
from oslo_log import log as logging
from stevedore import driver
from stevedore.extension import ExtensionManager
from cyborg.common import exception
from cyborg.common import utils
from cyborg import objects
from cyborg.conf import CONF
LOG = logging.getLogger(__name__)
AGENT_RESOURCE_SEMAPHORE = "agent_resources"
DEPLOYABLE_VERSION = "1.0"
# need to change the driver field name
DEPLOYABLE_HOST_MAPS = {"assignable": "assignable",
"address": "devices",
"board": "product_id",
"type": "function",
"vendor": "vendor_id",
"name": "name",
"interface_type": "interface_type"
}
class ResourceTracker(object):
"""Agent helper class for keeping track of resource usage as instances
@ -50,149 +39,35 @@ class ResourceTracker(object):
"""
def __init__(self, host, cond_api):
# FIXME (Shaohe) local cache for Accelerator.
# Will fix it in next release.
self.fpgas = None
self.host = host
self.conductor_api = cond_api
self.fpga_driver = FPGADriver()
self.acc_drivers = []
self._initialize_drivers()
@utils.synchronized(AGENT_RESOURCE_SEMAPHORE)
def claim(self, context):
pass
def _fpga_compare_and_update(self, host_dev, acclerator):
need_updated = False
for k, v in DEPLOYABLE_HOST_MAPS.items():
if acclerator[k] != host_dev[v]:
need_updated = True
acclerator[k] = host_dev[v]
return need_updated
def _gen_accelerator_for_deployable(
self, context, name, vendor, productor, desc="", dev_type="pf",
acc_type="FPGA", acc_cap="", remotable=0):
def _initialize_drivers(self, enabled_drivers=[]):
"""
The type of the accelerator device, e.g GPU, FPGA, ...
acc_type defines the usage of the accelerator, e.g Crypto
acc_capability defines the specific capability, e.g AES
Load accelerator drivers.
:return: [nvidia_gpu_driver_obj, intel_fpga_driver_obj]
"""
db_acc = {
'deleted': False,
'uuid': uuidutils.generate_uuid(),
'name': name,
'description': desc,
'project_id': context.project_id,
'user_id': context.user_id,
'device_type': dev_type,
'acc_type': acc_type,
'acc_capability': acc_cap,
'vendor_id': vendor,
'product_id': productor,
'remotable': remotable
}
acc = objects.Accelerator(context, **db_acc)
acc = self.conductor_api.accelerator_create(context, acc)
return acc
def _gen_deployable_from_host_dev(self, host_dev, acc_id,
parent_uuid=None, root_uuid=None):
dep = {}
for k, v in DEPLOYABLE_HOST_MAPS.items():
dep[k] = host_dev[v]
dep["host"] = self.host
dep["version"] = DEPLOYABLE_VERSION
dep["availability"] = "free"
dep["uuid"] = uuidutils.generate_uuid()
dep["parent_uuid"] = parent_uuid
dep["root_uuid"] = root_uuid
dep["accelerator_id"] = acc_id
return dep
acc_drivers = []
if not enabled_drivers:
enabled_drivers = CONF.agent.enabled_drivers
valid_drivers = ExtensionManager(
namespace='cyborg.accelerator.driver').names()
for d in enabled_drivers:
if d not in valid_drivers:
raise exception.InvalidDriver(name=d)
acc_driver = driver.DriverManager(
namespace='cyborg.accelerator.driver', name=d,
invoke_on_load=True).driver
acc_drivers.append(acc_driver)
self.acc_drivers = acc_drivers
@utils.synchronized(AGENT_RESOURCE_SEMAPHORE)
def update_usage(self, context):
"""Update the resource usage and stats after a change in an
instance
"""Update the resource usage periodically.
"""
def create_deployable(fpgas, bdf, acc_id, parent_uuid=None):
fpga = fpgas[bdf]
dep = self._gen_deployable_from_host_dev(fpga, acc_id)
# if parent_uuid:
dep["parent_uuid"] = parent_uuid
obj_dep = objects.Deployable(context, **dep)
new_dep = self.conductor_api.deployable_create(context, obj_dep)
return new_dep
# NOTE(Shaohe Feng) need more agreement on how to keep consistency.
fpgas = self._get_fpga_devices()
bdfs = set(fpgas.keys())
deployables = self.conductor_api.deployable_get_by_host(
context, self.host)
# NOTE(Shaohe Feng) when no "address" in deployable?
accls = dict([(v["address"], v) for v in deployables])
accl_bdfs = set(accls.keys())
# Firstly update
for mutual in accl_bdfs & bdfs:
accl = accls[mutual]
if self._fpga_compare_and_update(fpgas[mutual], accl):
try:
self.conductor_api.deployable_update(context, accl)
except RemoteError as e:
LOG.error(e)
# Add
new = bdfs - accl_bdfs
new_pf = set([n for n in new if fpgas[n]["function"] == "pf"])
for n in new_pf:
fpga = fpgas[n]
acc = self._gen_accelerator_for_deployable(
context, fpga["name"], fpga["vendor_id"], fpga["product_id"],
"FPGA device on %s" % self.host, "pf", "FPGA")
new_dep = create_deployable(fpgas, n, acc.id)
accls[n] = new_dep
sub_vf = set()
if "regions" in n:
sub_vf = set([sub["devices"] for sub in fpgas[n]["regions"]])
for vf in sub_vf & new:
fpga = fpgas[n]
acc = self._gen_accelerator_for_deployable(
context, fpga["name"], fpga["vendor_id"],
fpga["product_id"], "FPGA device on %s" % self.host,
"vf", "FPGA")
new_dep = create_deployable(fpgas, vf, acc.id, new_dep["uuid"])
accls[vf] = new_dep
new.remove(vf)
for n in new - new_pf:
p_bdf = fpgas[n]["parent_devices"]
p_accl = accls[p_bdf]
p_uuid = p_accl["uuid"]
fpga = fpgas[n]
acc = self._gen_accelerator_for_deployable(
context, fpga["name"], fpga["vendor_id"], fpga["product_id"],
"FPGA device on %s" % self.host, "pf", "FPGA")
new_dep = create_deployable(fpgas, n, acc.id, p_uuid)
# Delete
for obsolete in accl_bdfs - bdfs:
try:
self.conductor_api.deployable_delete(context, accls[obsolete])
except RemoteError as e:
LOG.error(e)
del accls[obsolete]
def _get_fpga_devices(self):
def form_dict(devices, fpgas):
for v in devices:
fpgas[v["devices"]] = v
if "regions" in v:
form_dict(v["regions"], fpgas)
fpgas = {}
vendors = self.fpga_driver.discover_vendors()
for v in vendors:
driver = self.fpga_driver.create(v)
form_dict(driver.discover(), fpgas)
return fpgas
acc_list = []
for acc_driver in self.acc_drivers:
acc_list.extend(acc_driver.discover())
# Call conductor_api here to diff and report acc data.

View File

@ -327,3 +327,7 @@ class ImageNotFound(NotFound):
class ImageBadRequest(Invalid):
msg_fmt = _("Request of image %(image_id)s got BadRequest response: "
"%(response)s")
class InvalidDriver(Invalid):
_msg_fmt = _("Found an invalid driver: %(name)s")

View File

@ -15,6 +15,7 @@
from oslo_config import cfg
from cyborg.conf import agent
from cyborg.conf import api
from cyborg.conf import database
from cyborg.conf import default
@ -25,6 +26,7 @@ from cyborg.conf import keystone
CONF = cfg.CONF
api.register_opts(CONF)
agent.register_opts(CONF)
database.register_opts(CONF)
default.register_opts(CONF)
default.register_placement_opts(CONF)

42
cyborg/conf/agent.py Normal file
View File

@ -0,0 +1,42 @@
# Copyright 2018 Beijing Lenovo Software Ltd.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from oslo_config import cfg
from cyborg.common.i18n import _
opts = [
cfg.ListOpt('enabled_drivers',
default=[],
help=_('The accelerator drivers enabled on this agent. Such '
'as intel_fpga_driver, nvidia_gpu_driver, etc.')),
]
opt_group = cfg.OptGroup(name='agent',
title='Options for the cyborg-agent service')
AGENT_OPTS = (opts)
def register_opts(conf):
conf.register_group(opt_group)
conf.register_opts(opts, group=opt_group)
def list_opts():
return {
opt_group: AGENT_OPTS
}

View File

@ -15,18 +15,11 @@
"""Cyborg agent resource_tracker test cases."""
import os
import fixtures
from cyborg.accelerator.drivers.fpga import utils
from cyborg.accelerator.drivers.fpga.intel import sysinfo
from cyborg.agent.resource_tracker import ResourceTracker
from cyborg.common import exception
from cyborg.conductor import rpcapi as cond_api
from cyborg.conf import CONF
from cyborg.tests import base
from cyborg.tests.unit.accelerator.drivers.fpga.intel import prepare_test_data
class TestResourceTracker(base.TestCase):
@ -34,22 +27,10 @@ class TestResourceTracker(base.TestCase):
def setUp(self):
super(TestResourceTracker, self).setUp()
self.syspath = sysinfo.SYS_FPGA
sysinfo.SYS_FPGA = "/sys/class/fpga"
tmp_sys_dir = self.useFixture(fixtures.TempDir())
prepare_test_data.create_fake_sysfs(tmp_sys_dir.path)
sysinfo.SYS_FPGA = os.path.join(
tmp_sys_dir.path, sysinfo.SYS_FPGA.split("/", 1)[-1])
utils.SYS_FPGA_PATH = sysinfo.SYS_FPGA
self.host = CONF.host
self.cond_api = cond_api.ConductorAPI()
self.rt = ResourceTracker(self.host, self.cond_api)
def tearDown(self):
super(TestResourceTracker, self).tearDown()
sysinfo.SYS_FPGA = self.syspath
utils.SYS_FPGA_PATH = self.syspath
def test_update_usage(self):
"""Update the resource usage and stats after a change in an
instance
@ -58,50 +39,13 @@ class TestResourceTracker(base.TestCase):
# has stored into DB by conductor correctly?
pass
def test_get_fpga_devices(self):
expect = {
'0000:5e:00.0': {
'function': 'pf',
'assignable': False,
'pr_num': '1',
'name': 'intel-fpga-dev.0',
'interface_type': 'pci',
'vendor_id': '0x8086',
'devices': '0000:5e:00.0',
'regions': [{
'function': 'vf',
'assignable': True,
'name': 'intel-fpga-dev.2',
'interface_type': 'pci',
'vendor_id': '0x8086',
'devices': '0000:5e:00.1',
'parent_devices': '0000:5e:00.0',
'path': '%s/intel-fpga-dev.2' % sysinfo.SYS_FPGA,
'product_id': '0xbcc1'}],
'parent_devices': '',
'path': '%s/intel-fpga-dev.0' % sysinfo.SYS_FPGA,
'product_id': '0xbcc0'},
'0000:5e:00.1': {
'function': 'vf',
'assignable': True,
'name': 'intel-fpga-dev.2',
'interface_type': 'pci',
'vendor_id': '0x8086',
'devices': '0000:5e:00.1',
'parent_devices': '0000:5e:00.0',
'path': '%s/intel-fpga-dev.2' % sysinfo.SYS_FPGA,
'product_id': '0xbcc1'},
'0000:be:00.0': {
'function': 'pf',
'assignable': True,
'pr_num': '0',
'name': 'intel-fpga-dev.1',
'interface_type': 'pci',
'vendor_id': '0x8086',
'devices': '0000:be:00.0',
'parent_devices': '',
'path': '%s/intel-fpga-dev.1' % sysinfo.SYS_FPGA,
'product_id': '0xbcc0'}}
def test_initialize_acc_drivers(self):
enabled_drivers = ['intel_fpga_driver']
self.rt._initialize_drivers(enabled_drivers=enabled_drivers)
drivers = self.rt.acc_drivers
self.assertEqual(len(drivers), len(enabled_drivers))
fpgas = self.rt._get_fpga_devices()
self.assertDictEqual(expect, fpgas)
def test_initialize_invalid_driver(self):
enabled_drivers = ['invalid_driver']
self.assertRaises(exception.InvalidDriver, self.rt._initialize_drivers,
enabled_drivers)

View File

@ -45,6 +45,10 @@ wsgi_scripts =
cyborg.database.migration_backend =
sqlalchemy = cyborg.db.sqlalchemy.migration
cyborg.accelerator.driver =
intel_fpga_driver = cyborg.accelerator.drivers.fpga.intel.driver:IntelFPGADriver
nvmf_spdk_driver = cyborg.accelerator.drivers.spdk.nvmf.nvmf:NVMFDRIVER
oslo.config.opts =
cyborg = cyborg.conf.opts:list_opts