Source code for apache_beam.yaml.yaml_enrichment
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
from typing import Any
from typing import Optional
import apache_beam as beam
from apache_beam.yaml import options
try:
  from apache_beam.transforms.enrichment import Enrichment
  from apache_beam.transforms.enrichment_handlers.bigquery import BigQueryEnrichmentHandler
  from apache_beam.transforms.enrichment_handlers.bigtable import BigTableEnrichmentHandler
  from apache_beam.transforms.enrichment_handlers.vertex_ai_feature_store import VertexAIFeatureStoreEnrichmentHandler
except ImportError:
  Enrichment = None  # type: ignore
  BigQueryEnrichmentHandler = None  # type: ignore
  BigTableEnrichmentHandler = None  # type: ignore
  VertexAIFeatureStoreEnrichmentHandler = None  # type: ignore
try:
  from apache_beam.transforms.enrichment_handlers.feast_feature_store import FeastFeatureStoreEnrichmentHandler
except ImportError:
  FeastFeatureStoreEnrichmentHandler = None  # type: ignore
[docs]
@beam.ptransform.ptransform_fn
def enrichment_transform(
    pcoll,
    enrichment_handler: str,
    handler_config: dict[str, Any],
    timeout: Optional[float] = 30):
  # pylint: disable=line-too-long
  """
    The Enrichment transform allows one to dynamically enhance elements in a
    pipeline by performing key-value lookups against external services like
    APIs or databases.
    Example using BigTable: ::
        - type: Enrichment
          config:
            enrichment_handler: 'BigTable'
            handler_config:
              project_id: 'apache-beam-testing'
              instance_id: 'beam-test'
              table_id: 'bigtable-enrichment-test'
              row_key: 'product_id'
            timeout: 30
    For more information on Enrichment, see the [Beam docs](
    https://beam.apache.org/documentation/transforms/python/elementwise/enrichment/).
    Args:
        enrichment_handler (str): Specifies the source from where data needs
          to be extracted into the pipeline for enriching data. One of
          "BigQuery", "BigTable", "FeastFeatureStore" or "VertexAIFeatureStore".
        handler_config (str): Specifies the parameters for the respective
          enrichment_handler in a YAML/JSON format. To see the full set of
          handler_config parameters, see their corresponding doc pages:
            - [BigQueryEnrichmentHandler](https://beam.apache.org/releases/pydoc/current/apache_beam.transforms.enrichment_handlers.bigquery.html#apache_beam.transforms.enrichment_handlers.bigquery.BigQueryEnrichmentHandler)
            - [BigTableEnrichmentHandler](https://beam.apache.org/releases/pydoc/current/apache_beam.transforms.enrichment_handlers.bigtable.html#apache_beam.transforms.enrichment_handlers.bigtable.BigTableEnrichmentHandler)
            - [FeastFeatureStoreEnrichmentHandler](https://beam.apache.org/releases/pydoc/current/apache_beam.transforms.enrichment_handlers.feast_feature_store.html#apache_beam.transforms.enrichment_handlers.feast_feature_store.FeastFeatureStoreEnrichmentHandler)
            - [VertexAIFeatureStoreEnrichmentHandler](https://beam.apache.org/releases/pydoc/current/apache_beam.transforms.enrichment_handlers.vertex_ai_feature_store.html#apache_beam.transforms.enrichment_handlers.vertex_ai_feature_store.VertexAIFeatureStoreEnrichmentHandler)
        timeout (float): Timeout for source requests in seconds. Defaults to 30
          seconds.
    """
  options.YamlOptions.check_enabled(pcoll.pipeline, 'Enrichment')
  if not Enrichment:
    raise ValueError(
        f"gcp dependencies not installed. Cannot use {enrichment_handler} "
        f"handler. Please install using 'pip install apache-beam[gcp]'.")
  if (enrichment_handler == 'FeastFeatureStore' and
      not FeastFeatureStoreEnrichmentHandler):
    raise ValueError(
        "FeastFeatureStore handler requires 'feast' package to be installed. " +
        "Please install using 'pip install feast[gcp]' and try again.")
  handler_map = {
      'BigQuery': BigQueryEnrichmentHandler,
      'BigTable': BigTableEnrichmentHandler,
      'FeastFeatureStore': FeastFeatureStoreEnrichmentHandler,
      'VertexAIFeatureStore': VertexAIFeatureStoreEnrichmentHandler
  }
  if enrichment_handler not in handler_map:
    raise ValueError(f"Unknown enrichment source: {enrichment_handler}")
  handler = handler_map[enrichment_handler](**handler_config)
  return pcoll | Enrichment(source_handler=handler, timeout=timeout)