Source code for

# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# See the License for the specific language governing permissions and
# limitations under the License.

# pytype: skip-file

from __future__ import annotations

import logging
import threading
from typing import Any
from typing import Dict
from typing import Iterable
from typing import Optional
from typing import Sequence
from typing import Tuple

import numpy as np

from import FileSystems
from import ModelHandler
from import PredictionResult
from apache_beam.utils.annotations import experimental

LOGGER = logging.getLogger("TensorRTEngineHandlerNumPy")
# This try/catch block allows users to submit jobs from a machine without
# GPU and other dependencies (tensorrt, cuda, etc.) at job submission time.
  import tensorrt as trt
  TRT_LOGGER = trt.Logger(trt.Logger.INFO)
  trt.init_libnvinfer_plugins(TRT_LOGGER, namespace="")'tensorrt module successfully imported.')
except ModuleNotFoundError:
  msg = 'tensorrt module was not found. This is ok as long as the specified ' \
    'runner has tensorrt dependencies installed.'

def _load_engine(engine_path):
  import tensorrt as trt
  file =, 'rb')
  runtime = trt.Runtime(TRT_LOGGER)
  engine = runtime.deserialize_cuda_engine(
  assert engine
  return engine

def _load_onnx(onnx_path):
  import tensorrt as trt
  builder = trt.Builder(TRT_LOGGER)
  network = builder.create_network(
      flags=1 << int(trt.NetworkDefinitionCreationFlag.EXPLICIT_BATCH))
  parser = trt.OnnxParser(network, TRT_LOGGER)
  with as f:
    if not parser.parse(
      LOGGER.error("Failed to load ONNX file: %s", onnx_path)
      for error in range(parser.num_errors):
      raise ValueError(f"Failed to load ONNX file: {onnx_path}")
  return network, builder

def _build_engine(network, builder):
  import tensorrt as trt
  config = builder.create_builder_config()
  runtime = trt.Runtime(TRT_LOGGER)
  plan = builder.build_serialized_network(network, config)
  engine = runtime.deserialize_cuda_engine(plan)
  return engine

def _assign_or_fail(args):
  """CUDA error checking."""
  from cuda import cuda
  err, ret = args[0], args[1:]
  if isinstance(err, cuda.CUresult):
    if err != cuda.CUresult.CUDA_SUCCESS:
      raise RuntimeError("Cuda Error: {}".format(err))
    raise RuntimeError("Unknown error type: {}".format(err))
  # Special case so that no unpacking is needed at call-site.
  if len(ret) == 1:
    return ret[0]
  return ret

[docs]class TensorRTEngine: def __init__(self, engine: trt.ICudaEngine): """Implementation of the TensorRTEngine class which handles allocations associated with TensorRT engine. Example Usage:: TensorRTEngine(engine) Args: engine: trt.ICudaEngine object that contains TensorRT engine """ from cuda import cuda import tensorrt as trt self.engine = engine self.context = engine.create_execution_context() self.context_lock = threading.RLock() self.inputs = [] self.outputs = [] self.gpu_allocations = [] self.cpu_allocations = [] """Setup I/O bindings.""" for i in range(self.engine.num_bindings): name = self.engine.get_binding_name(i) dtype = self.engine.get_binding_dtype(i) shape = self.engine.get_binding_shape(i) size = trt.volume(shape) * dtype.itemsize allocation = _assign_or_fail(cuda.cuMemAlloc(size)) binding = { 'index': i, 'name': name, 'dtype': np.dtype(trt.nptype(dtype)), 'shape': list(shape), 'allocation': allocation, 'size': size } self.gpu_allocations.append(allocation) if self.engine.binding_is_input(i): self.inputs.append(binding) else: self.outputs.append(binding) assert self.context assert len(self.inputs) > 0 assert len(self.outputs) > 0 assert len(self.gpu_allocations) > 0 for output in self.outputs: self.cpu_allocations.append(np.zeros(output['shape'], output['dtype'])) # Create CUDA Stream. = _assign_or_fail(cuda.cuStreamCreate(0))
[docs] def get_engine_attrs(self): """Returns TensorRT engine attributes.""" return ( self.engine, self.context, self.context_lock, self.inputs, self.outputs, self.gpu_allocations, self.cpu_allocations,
[docs]@experimental(extra_message="No backwards-compatibility guarantees.") class TensorRTEngineHandlerNumPy(ModelHandler[np.ndarray, PredictionResult, TensorRTEngine]): def __init__(self, min_batch_size: int, max_batch_size: int, **kwargs): """Implementation of the ModelHandler interface for TensorRT. Example Usage:: pcoll | RunInference( TensorRTEngineHandlerNumPy( min_batch_size=1, max_batch_size=1, engine_path="my_uri")) **NOTE:** This API and its implementation are under development and do not provide backward compatibility guarantees. Args: min_batch_size: minimum accepted batch size. max_batch_size: maximum accepted batch size. kwargs: Additional arguments like 'engine_path' and 'onnx_path' are currently supported. See for details """ self.min_batch_size = min_batch_size self.max_batch_size = max_batch_size if 'engine_path' in kwargs: self.engine_path = kwargs.get('engine_path') elif 'onnx_path' in kwargs: self.onnx_path = kwargs.get('onnx_path')
[docs] def batch_elements_kwargs(self): """Sets min_batch_size and max_batch_size of a TensorRT engine.""" return { 'min_batch_size': self.min_batch_size, 'max_batch_size': self.max_batch_size }
[docs] def load_model(self) -> TensorRTEngine: """Loads and initializes a TensorRT engine for processing.""" engine = _load_engine(self.engine_path) return TensorRTEngine(engine)
[docs] def load_onnx(self) -> Tuple[trt.INetworkDefinition, trt.Builder]: """Loads and parses an onnx model for processing.""" return _load_onnx(self.onnx_path)
[docs] def build_engine( self, network: trt.INetworkDefinition, builder: trt.Builder) -> TensorRTEngine: """Build an engine according to parsed/created network.""" engine = _build_engine(network, builder) return TensorRTEngine(engine)
[docs] def run_inference( self, batch: Sequence[np.ndarray], engine: TensorRTEngine, inference_args: Optional[Dict[str, Any]] = None ) -> Iterable[PredictionResult]: """ Runs inferences on a batch of Tensors and returns an Iterable of TensorRT Predictions. Args: batch: A np.ndarray or a np.ndarray that represents a concatenation of multiple arrays as a batch. engine: A TensorRT engine. inference_args: Any additional arguments for an inference that are not applicable to TensorRT. Returns: An Iterable of type PredictionResult. """ from cuda import cuda ( engine, context, context_lock, inputs, outputs, gpu_allocations, cpu_allocations, stream) = engine.get_engine_attrs() # Process I/O and execute the network with context_lock: _assign_or_fail( cuda.cuMemcpyHtoDAsync( inputs[0]['allocation'], np.ascontiguousarray(batch), inputs[0]['size'], stream)) context.execute_async_v2(gpu_allocations, stream) for output in range(len(cpu_allocations)): _assign_or_fail( cuda.cuMemcpyDtoHAsync( cpu_allocations[output], outputs[output]['allocation'], outputs[output]['size'], stream)) _assign_or_fail(cuda.cuStreamSynchronize(stream)) return [ PredictionResult( x, [prediction[idx] for prediction in cpu_allocations]) for idx, x in enumerate(batch) ]
[docs] def get_num_bytes(self, batch: Sequence[np.ndarray]) -> int: """ Returns: The number of bytes of data for a batch of Tensors. """ return sum((np_array.itemsize for np_array in batch))
[docs] def get_metrics_namespace(self) -> str: """ Returns a namespace for metrics collected by the RunInference transform. """ return 'RunInferenceTensorRT'