Merge "aggent support resource tracker for FPGA"

This commit is contained in:
Zuul 2018-02-04 17:09:07 +00:00 committed by Gerrit Code Review
commit af5dd8e1ed
3 changed files with 178 additions and 3 deletions

View File

@ -14,24 +14,42 @@
# under the License.
import oslo_messaging as messaging
from oslo_service import periodic_task
from cyborg.accelerator.drivers.fpga.base import FPGADriver
from cyborg.agent.resource_tracker import ResourceTracker
from cyborg.conductor import rpcapi as cond_api
from cyborg.conf import CONF
class AgentManager(object):
class AgentManager(periodic_task.PeriodicTasks):
"""Cyborg Agent manager main class."""
RPC_API_VERSION = '1.0'
target = messaging.Target(version=RPC_API_VERSION)
def __init__(self, topic, host=None):
super(AgentManager, self).__init__()
super(AgentManager, self).__init__(CONF)
self.topic = topic
self.host = host or CONF.host
self.fpga_driver = FPGADriver()
self.cond_api = cond_api.ConductorAPI()
self._rt = ResourceTracker(host, self.cond_api)
def periodic_tasks(self, context, raise_on_error=False):
pass
return self.run_periodic_tasks(context, raise_on_error=raise_on_error)
def hardware_list(self, context, values):
"""List installed hardware."""
pass
def fpga_program(self, context, accelerator, image):
""" Program a FPGA regoin, image can be a url or local file"""
# TODO (Shaohe Feng) Get image from glance.
# And add claim and rollback logical.
raise NotImplementedError()
@periodic_task.periodic_task(run_immediately=True)
def update_available_resource(self, context, startup=True):
"""update all kinds of accelerator resources from their drivers."""
self._rt.update_usage(context)

View File

@ -0,0 +1,153 @@
# Copyright (c) 2018 Intel.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
Track resources like FPGA GPU and QAT for a host. Provides the
conductor with useful information about availability through the accelerator
model.
"""
from oslo_log import log as logging
from oslo_messaging.rpc.client import RemoteError
from oslo_utils import uuidutils
from cyborg.accelerator.drivers.fpga.base import FPGADriver
from cyborg.common import utils
from cyborg import objects
LOG = logging.getLogger(__name__)
AGENT_RESOURCE_SEMAPHORE = "agent_resources"
DEPLOYABLE_VERSION = "1.0"
# need to change the driver field name
DEPLOYABLE_HOST_MAPS = {"assignable": "assignable",
"pcie_address": "devices",
"board": "product_id",
"type": "function",
"vendor": "vendor_id",
"name": "name"}
class ResourceTracker(object):
"""Agent helper class for keeping track of resource usage as instances
are built and destroyed.
"""
def __init__(self, host, cond_api):
# FIXME (Shaohe) local cache for Accelerator.
# Will fix it in next release.
self.fpgas = None
self.host = host
self.conductor_api = cond_api
self.fpga_driver = FPGADriver()
@utils.synchronized(AGENT_RESOURCE_SEMAPHORE)
def claim(self, context):
pass
def _fpga_compare_and_update(self, host_dev, acclerator):
need_updated = False
for k, v in DEPLOYABLE_HOST_MAPS.items():
if acclerator[k] != host_dev[v]:
need_updated = True
acclerator[k] = host_dev[v]
return need_updated
def _gen_deployable_from_host_dev(self, host_dev):
dep = {}
for k, v in DEPLOYABLE_HOST_MAPS.items():
dep[k] = host_dev[v]
dep["host"] = self.host
dep["version"] = DEPLOYABLE_VERSION
dep["availability"] = "free"
dep["uuid"] = uuidutils.generate_uuid()
return dep
@utils.synchronized(AGENT_RESOURCE_SEMAPHORE)
def update_usage(self, context):
"""Update the resource usage and stats after a change in an
instance
"""
def create_deployable(fpgas, bdf, parent_uuid=None):
fpga = fpgas[bdf]
dep = self._gen_deployable_from_host_dev(fpga)
# if parent_uuid:
dep["parent_uuid"] = parent_uuid
obj_dep = objects.Deployable(context, **dep)
new_dep = self.conductor_api.deployable_create(context, obj_dep)
return new_dep
# NOTE(Shaohe Feng) need more agreement on how to keep consistency.
fpgas = self._get_fpga_devices()
bdfs = set(fpgas.keys())
deployables = self.conductor_api.deployable_get_by_host(
context, self.host)
# NOTE(Shaohe Feng) when no "pcie_address" in deployable?
accls = dict([(v["pcie_address"], v) for v in deployables])
accl_bdfs = set(accls.keys())
# Firstly update
for mutual in accl_bdfs & bdfs:
accl = accls[mutual]
if self._fpga_compare_and_update(fpgas[mutual], accl):
try:
self.conductor_api.deployable_update(context, accl)
except RemoteError as e:
LOG.error(e)
# Add
new = bdfs - accl_bdfs
new_pf = set([n for n in new if fpgas[n]["function"] == "pf"])
for n in new_pf:
new_dep = create_deployable(fpgas, n)
accls[n] = new_dep
sub_vf = set()
if "regions" in n:
sub_vf = set([sub["devices"] for sub in fpgas[n]["regions"]])
for vf in sub_vf & new:
new_dep = create_deployable(fpgas, vf, new_dep["uuid"])
accls[vf] = new_dep
new.remove(vf)
for n in new - new_pf:
p_bdf = fpgas[n]["parent_devices"]
p_accl = accls[p_bdf]
p_uuid = p_accl["uuid"]
new_dep = create_deployable(fpgas, n, p_uuid)
# Delete
for obsolete in accl_bdfs - bdfs:
try:
self.conductor_api.deployable_delete(context, accls[obsolete])
except RemoteError as e:
LOG.error(e)
del accls[obsolete]
def _get_fpga_devices(self):
def form_dict(devices, fpgas):
for v in devices:
fpgas[v["devices"]] = v
if "regions" in v:
form_dict(v["regions"], fpgas)
fpgas = {}
vendors = self.fpga_driver.discover_vendors()
for v in vendors:
driver = self.fpga_driver.create(v)
form_dict(driver.discover(), fpgas)
return fpgas

View File

@ -16,12 +16,16 @@
"""Utilities and helper functions."""
from oslo_log import log
from oslo_concurrency import lockutils
import six
LOG = log.getLogger(__name__)
synchronized = lockutils.synchronized_with_prefix('cyborg-')
def safe_rstrip(value, chars=None):
"""Removes trailing characters from a string if that does not make it empty