Merge "aggent support resource tracker for FPGA"
This commit is contained in:
commit
af5dd8e1ed
|
@ -14,24 +14,42 @@
|
|||
# under the License.
|
||||
|
||||
import oslo_messaging as messaging
|
||||
from oslo_service import periodic_task
|
||||
|
||||
from cyborg.accelerator.drivers.fpga.base import FPGADriver
|
||||
from cyborg.agent.resource_tracker import ResourceTracker
|
||||
from cyborg.conductor import rpcapi as cond_api
|
||||
from cyborg.conf import CONF
|
||||
|
||||
|
||||
class AgentManager(object):
|
||||
class AgentManager(periodic_task.PeriodicTasks):
|
||||
"""Cyborg Agent manager main class."""
|
||||
|
||||
RPC_API_VERSION = '1.0'
|
||||
target = messaging.Target(version=RPC_API_VERSION)
|
||||
|
||||
def __init__(self, topic, host=None):
|
||||
super(AgentManager, self).__init__()
|
||||
super(AgentManager, self).__init__(CONF)
|
||||
self.topic = topic
|
||||
self.host = host or CONF.host
|
||||
self.fpga_driver = FPGADriver()
|
||||
self.cond_api = cond_api.ConductorAPI()
|
||||
self._rt = ResourceTracker(host, self.cond_api)
|
||||
|
||||
def periodic_tasks(self, context, raise_on_error=False):
|
||||
pass
|
||||
return self.run_periodic_tasks(context, raise_on_error=raise_on_error)
|
||||
|
||||
def hardware_list(self, context, values):
|
||||
"""List installed hardware."""
|
||||
pass
|
||||
|
||||
def fpga_program(self, context, accelerator, image):
|
||||
""" Program a FPGA regoin, image can be a url or local file"""
|
||||
# TODO (Shaohe Feng) Get image from glance.
|
||||
# And add claim and rollback logical.
|
||||
raise NotImplementedError()
|
||||
|
||||
@periodic_task.periodic_task(run_immediately=True)
|
||||
def update_available_resource(self, context, startup=True):
|
||||
"""update all kinds of accelerator resources from their drivers."""
|
||||
self._rt.update_usage(context)
|
||||
|
|
|
@ -0,0 +1,153 @@
|
|||
# Copyright (c) 2018 Intel.
|
||||
# All Rights Reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
"""
|
||||
Track resources like FPGA GPU and QAT for a host. Provides the
|
||||
conductor with useful information about availability through the accelerator
|
||||
model.
|
||||
"""
|
||||
|
||||
from oslo_log import log as logging
|
||||
from oslo_messaging.rpc.client import RemoteError
|
||||
from oslo_utils import uuidutils
|
||||
|
||||
from cyborg.accelerator.drivers.fpga.base import FPGADriver
|
||||
from cyborg.common import utils
|
||||
from cyborg import objects
|
||||
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
AGENT_RESOURCE_SEMAPHORE = "agent_resources"
|
||||
|
||||
DEPLOYABLE_VERSION = "1.0"
|
||||
|
||||
# need to change the driver field name
|
||||
DEPLOYABLE_HOST_MAPS = {"assignable": "assignable",
|
||||
"pcie_address": "devices",
|
||||
"board": "product_id",
|
||||
"type": "function",
|
||||
"vendor": "vendor_id",
|
||||
"name": "name"}
|
||||
|
||||
|
||||
class ResourceTracker(object):
|
||||
"""Agent helper class for keeping track of resource usage as instances
|
||||
are built and destroyed.
|
||||
"""
|
||||
|
||||
def __init__(self, host, cond_api):
|
||||
# FIXME (Shaohe) local cache for Accelerator.
|
||||
# Will fix it in next release.
|
||||
self.fpgas = None
|
||||
self.host = host
|
||||
self.conductor_api = cond_api
|
||||
self.fpga_driver = FPGADriver()
|
||||
|
||||
@utils.synchronized(AGENT_RESOURCE_SEMAPHORE)
|
||||
def claim(self, context):
|
||||
pass
|
||||
|
||||
def _fpga_compare_and_update(self, host_dev, acclerator):
|
||||
need_updated = False
|
||||
for k, v in DEPLOYABLE_HOST_MAPS.items():
|
||||
if acclerator[k] != host_dev[v]:
|
||||
need_updated = True
|
||||
acclerator[k] = host_dev[v]
|
||||
return need_updated
|
||||
|
||||
def _gen_deployable_from_host_dev(self, host_dev):
|
||||
dep = {}
|
||||
for k, v in DEPLOYABLE_HOST_MAPS.items():
|
||||
dep[k] = host_dev[v]
|
||||
dep["host"] = self.host
|
||||
dep["version"] = DEPLOYABLE_VERSION
|
||||
dep["availability"] = "free"
|
||||
dep["uuid"] = uuidutils.generate_uuid()
|
||||
return dep
|
||||
|
||||
@utils.synchronized(AGENT_RESOURCE_SEMAPHORE)
|
||||
def update_usage(self, context):
|
||||
"""Update the resource usage and stats after a change in an
|
||||
instance
|
||||
"""
|
||||
def create_deployable(fpgas, bdf, parent_uuid=None):
|
||||
fpga = fpgas[bdf]
|
||||
dep = self._gen_deployable_from_host_dev(fpga)
|
||||
# if parent_uuid:
|
||||
dep["parent_uuid"] = parent_uuid
|
||||
obj_dep = objects.Deployable(context, **dep)
|
||||
new_dep = self.conductor_api.deployable_create(context, obj_dep)
|
||||
return new_dep
|
||||
|
||||
# NOTE(Shaohe Feng) need more agreement on how to keep consistency.
|
||||
fpgas = self._get_fpga_devices()
|
||||
bdfs = set(fpgas.keys())
|
||||
deployables = self.conductor_api.deployable_get_by_host(
|
||||
context, self.host)
|
||||
|
||||
# NOTE(Shaohe Feng) when no "pcie_address" in deployable?
|
||||
accls = dict([(v["pcie_address"], v) for v in deployables])
|
||||
accl_bdfs = set(accls.keys())
|
||||
|
||||
# Firstly update
|
||||
for mutual in accl_bdfs & bdfs:
|
||||
accl = accls[mutual]
|
||||
if self._fpga_compare_and_update(fpgas[mutual], accl):
|
||||
try:
|
||||
self.conductor_api.deployable_update(context, accl)
|
||||
except RemoteError as e:
|
||||
LOG.error(e)
|
||||
# Add
|
||||
new = bdfs - accl_bdfs
|
||||
new_pf = set([n for n in new if fpgas[n]["function"] == "pf"])
|
||||
for n in new_pf:
|
||||
new_dep = create_deployable(fpgas, n)
|
||||
accls[n] = new_dep
|
||||
sub_vf = set()
|
||||
if "regions" in n:
|
||||
sub_vf = set([sub["devices"] for sub in fpgas[n]["regions"]])
|
||||
for vf in sub_vf & new:
|
||||
new_dep = create_deployable(fpgas, vf, new_dep["uuid"])
|
||||
accls[vf] = new_dep
|
||||
new.remove(vf)
|
||||
for n in new - new_pf:
|
||||
p_bdf = fpgas[n]["parent_devices"]
|
||||
p_accl = accls[p_bdf]
|
||||
p_uuid = p_accl["uuid"]
|
||||
new_dep = create_deployable(fpgas, n, p_uuid)
|
||||
|
||||
# Delete
|
||||
for obsolete in accl_bdfs - bdfs:
|
||||
try:
|
||||
self.conductor_api.deployable_delete(context, accls[obsolete])
|
||||
except RemoteError as e:
|
||||
LOG.error(e)
|
||||
del accls[obsolete]
|
||||
|
||||
def _get_fpga_devices(self):
|
||||
|
||||
def form_dict(devices, fpgas):
|
||||
for v in devices:
|
||||
fpgas[v["devices"]] = v
|
||||
if "regions" in v:
|
||||
form_dict(v["regions"], fpgas)
|
||||
|
||||
fpgas = {}
|
||||
vendors = self.fpga_driver.discover_vendors()
|
||||
for v in vendors:
|
||||
driver = self.fpga_driver.create(v)
|
||||
form_dict(driver.discover(), fpgas)
|
||||
return fpgas
|
|
@ -16,12 +16,16 @@
|
|||
"""Utilities and helper functions."""
|
||||
|
||||
from oslo_log import log
|
||||
from oslo_concurrency import lockutils
|
||||
import six
|
||||
|
||||
|
||||
LOG = log.getLogger(__name__)
|
||||
|
||||
|
||||
synchronized = lockutils.synchronized_with_prefix('cyborg-')
|
||||
|
||||
|
||||
def safe_rstrip(value, chars=None):
|
||||
"""Removes trailing characters from a string if that does not make it empty
|
||||
|
||||
|
|
Loading…
Reference in New Issue