aggent support resource tracker for FPGA
use PeriodicTasks to track resources. Info: https://docs.openstack.org/oslo.service/latest/reference/periodic_task.html Aggent is a bridge for FPGA driver and conductor. It gets the lastest informations and update them to conductor. This patch we call conductor API directly instead of object remotable method. Will support it in the next version. This patch depends on the FPGA driver patch and conductor patch. How to test this patch: 1. apply the fpga driver patch https://review.openstack.org/#/c/531129 2. generate a sysfs fpga data $ ./cyborg/tests/unit/accelerator/drivers/fpga/intel/prepare_test_data.py 3. Change the SYS_FPGA in cyborg/accelerator/drivers/fpga/intel/sysinfo.py to "/tmp/sys/class/fpga" 4. change the SYS_FPGA_PATH in cyborg/accelerator/drivers/fpga/utils.py to "/tmp/sys/class/fpga" 5. restart agent. The unittest for this patch will be in a separated patch, and comes out ASAP. Co-Authored-By: Dolpher Du <Dolpher.Du@intel.com> Change-Id: I5e487cf939aa65d0fc79399ddd5d1337a8c2fa98
This commit is contained in:
parent
b22761ab0d
commit
b96d0e2570
|
@ -14,24 +14,42 @@
|
|||
# under the License.
|
||||
|
||||
import oslo_messaging as messaging
|
||||
from oslo_service import periodic_task
|
||||
|
||||
from cyborg.accelerator.drivers.fpga.base import FPGADriver
|
||||
from cyborg.agent.resource_tracker import ResourceTracker
|
||||
from cyborg.conductor import rpcapi as cond_api
|
||||
from cyborg.conf import CONF
|
||||
|
||||
|
||||
class AgentManager(object):
|
||||
class AgentManager(periodic_task.PeriodicTasks):
|
||||
"""Cyborg Agent manager main class."""
|
||||
|
||||
RPC_API_VERSION = '1.0'
|
||||
target = messaging.Target(version=RPC_API_VERSION)
|
||||
|
||||
def __init__(self, topic, host=None):
|
||||
super(AgentManager, self).__init__()
|
||||
super(AgentManager, self).__init__(CONF)
|
||||
self.topic = topic
|
||||
self.host = host or CONF.host
|
||||
self.fpga_driver = FPGADriver()
|
||||
self.cond_api = cond_api.ConductorAPI()
|
||||
self._rt = ResourceTracker(host, self.cond_api)
|
||||
|
||||
def periodic_tasks(self, context, raise_on_error=False):
|
||||
pass
|
||||
return self.run_periodic_tasks(context, raise_on_error=raise_on_error)
|
||||
|
||||
def hardware_list(self, context, values):
|
||||
"""List installed hardware."""
|
||||
pass
|
||||
|
||||
def fpga_program(self, context, accelerator, image):
|
||||
""" Program a FPGA regoin, image can be a url or local file"""
|
||||
# TODO (Shaohe Feng) Get image from glance.
|
||||
# And add claim and rollback logical.
|
||||
raise NotImplementedError()
|
||||
|
||||
@periodic_task.periodic_task(run_immediately=True)
|
||||
def update_available_resource(self, context, startup=True):
|
||||
"""update all kinds of accelerator resources from their drivers."""
|
||||
self._rt.update_usage(context)
|
||||
|
|
|
@ -0,0 +1,153 @@
|
|||
# Copyright (c) 2018 Intel.
|
||||
# All Rights Reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
"""
|
||||
Track resources like FPGA GPU and QAT for a host. Provides the
|
||||
conductor with useful information about availability through the accelerator
|
||||
model.
|
||||
"""
|
||||
|
||||
from oslo_log import log as logging
|
||||
from oslo_messaging.rpc.client import RemoteError
|
||||
from oslo_utils import uuidutils
|
||||
|
||||
from cyborg.accelerator.drivers.fpga.base import FPGADriver
|
||||
from cyborg.common import utils
|
||||
from cyborg import objects
|
||||
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
AGENT_RESOURCE_SEMAPHORE = "agent_resources"
|
||||
|
||||
DEPLOYABLE_VERSION = "1.0"
|
||||
|
||||
# need to change the driver field name
|
||||
DEPLOYABLE_HOST_MAPS = {"assignable": "assignable",
|
||||
"pcie_address": "devices",
|
||||
"board": "product_id",
|
||||
"type": "function",
|
||||
"vendor": "vendor_id",
|
||||
"name": "name"}
|
||||
|
||||
|
||||
class ResourceTracker(object):
|
||||
"""Agent helper class for keeping track of resource usage as instances
|
||||
are built and destroyed.
|
||||
"""
|
||||
|
||||
def __init__(self, host, cond_api):
|
||||
# FIXME (Shaohe) local cache for Accelerator.
|
||||
# Will fix it in next release.
|
||||
self.fpgas = None
|
||||
self.host = host
|
||||
self.conductor_api = cond_api
|
||||
self.fpga_driver = FPGADriver()
|
||||
|
||||
@utils.synchronized(AGENT_RESOURCE_SEMAPHORE)
|
||||
def claim(self, context):
|
||||
pass
|
||||
|
||||
def _fpga_compare_and_update(self, host_dev, acclerator):
|
||||
need_updated = False
|
||||
for k, v in DEPLOYABLE_HOST_MAPS.items():
|
||||
if acclerator[k] != host_dev[v]:
|
||||
need_updated = True
|
||||
acclerator[k] = host_dev[v]
|
||||
return need_updated
|
||||
|
||||
def _gen_deployable_from_host_dev(self, host_dev):
|
||||
dep = {}
|
||||
for k, v in DEPLOYABLE_HOST_MAPS.items():
|
||||
dep[k] = host_dev[v]
|
||||
dep["host"] = self.host
|
||||
dep["version"] = DEPLOYABLE_VERSION
|
||||
dep["availability"] = "free"
|
||||
dep["uuid"] = uuidutils.generate_uuid()
|
||||
return dep
|
||||
|
||||
@utils.synchronized(AGENT_RESOURCE_SEMAPHORE)
|
||||
def update_usage(self, context):
|
||||
"""Update the resource usage and stats after a change in an
|
||||
instance
|
||||
"""
|
||||
def create_deployable(fpgas, bdf, parent_uuid=None):
|
||||
fpga = fpgas[bdf]
|
||||
dep = self._gen_deployable_from_host_dev(fpga)
|
||||
# if parent_uuid:
|
||||
dep["parent_uuid"] = parent_uuid
|
||||
obj_dep = objects.Deployable(context, **dep)
|
||||
new_dep = self.conductor_api.deployable_create(context, obj_dep)
|
||||
return new_dep
|
||||
|
||||
# NOTE(Shaohe Feng) need more agreement on how to keep consistency.
|
||||
fpgas = self._get_fpga_devices()
|
||||
bdfs = set(fpgas.keys())
|
||||
deployables = self.conductor_api.deployable_get_by_host(
|
||||
context, self.host)
|
||||
|
||||
# NOTE(Shaohe Feng) when no "pcie_address" in deployable?
|
||||
accls = dict([(v["pcie_address"], v) for v in deployables])
|
||||
accl_bdfs = set(accls.keys())
|
||||
|
||||
# Firstly update
|
||||
for mutual in accl_bdfs & bdfs:
|
||||
accl = accls[mutual]
|
||||
if self._fpga_compare_and_update(fpgas[mutual], accl):
|
||||
try:
|
||||
self.conductor_api.deployable_update(context, accl)
|
||||
except RemoteError as e:
|
||||
LOG.error(e)
|
||||
# Add
|
||||
new = bdfs - accl_bdfs
|
||||
new_pf = set([n for n in new if fpgas[n]["function"] == "pf"])
|
||||
for n in new_pf:
|
||||
new_dep = create_deployable(fpgas, n)
|
||||
accls[n] = new_dep
|
||||
sub_vf = set()
|
||||
if "regions" in n:
|
||||
sub_vf = set([sub["devices"] for sub in fpgas[n]["regions"]])
|
||||
for vf in sub_vf & new:
|
||||
new_dep = create_deployable(fpgas, vf, new_dep["uuid"])
|
||||
accls[vf] = new_dep
|
||||
new.remove(vf)
|
||||
for n in new - new_pf:
|
||||
p_bdf = fpgas[n]["parent_devices"]
|
||||
p_accl = accls[p_bdf]
|
||||
p_uuid = p_accl["uuid"]
|
||||
new_dep = create_deployable(fpgas, n, p_uuid)
|
||||
|
||||
# Delete
|
||||
for obsolete in accl_bdfs - bdfs:
|
||||
try:
|
||||
self.conductor_api.deployable_delete(context, accls[obsolete])
|
||||
except RemoteError as e:
|
||||
LOG.error(e)
|
||||
del accls[obsolete]
|
||||
|
||||
def _get_fpga_devices(self):
|
||||
|
||||
def form_dict(devices, fpgas):
|
||||
for v in devices:
|
||||
fpgas[v["devices"]] = v
|
||||
if "regions" in v:
|
||||
form_dict(v["regions"], fpgas)
|
||||
|
||||
fpgas = {}
|
||||
vendors = self.fpga_driver.discover_vendors()
|
||||
for v in vendors:
|
||||
driver = self.fpga_driver.create(v)
|
||||
form_dict(driver.discover(), fpgas)
|
||||
return fpgas
|
|
@ -16,12 +16,16 @@
|
|||
"""Utilities and helper functions."""
|
||||
|
||||
from oslo_log import log
|
||||
from oslo_concurrency import lockutils
|
||||
import six
|
||||
|
||||
|
||||
LOG = log.getLogger(__name__)
|
||||
|
||||
|
||||
synchronized = lockutils.synchronized_with_prefix('cyborg-')
|
||||
|
||||
|
||||
def safe_rstrip(value, chars=None):
|
||||
"""Removes trailing characters from a string if that does not make it empty
|
||||
|
||||
|
|
Loading…
Reference in New Issue