Source code for apache_beam.yaml.yaml_enrichment
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
from typing import Any
from typing import Dict
from typing import Optional
import apache_beam as beam
from apache_beam.yaml import options
try:
from apache_beam.transforms.enrichment import Enrichment
from apache_beam.transforms.enrichment_handlers.bigquery import BigQueryEnrichmentHandler
from apache_beam.transforms.enrichment_handlers.bigtable import BigTableEnrichmentHandler
from apache_beam.transforms.enrichment_handlers.vertex_ai_feature_store import VertexAIFeatureStoreEnrichmentHandler
except ImportError:
Enrichment = None # type: ignore
BigQueryEnrichmentHandler = None # type: ignore
BigTableEnrichmentHandler = None # type: ignore
VertexAIFeatureStoreEnrichmentHandler = None # type: ignore
try:
from apache_beam.transforms.enrichment_handlers.feast_feature_store import FeastFeatureStoreEnrichmentHandler
except ImportError:
FeastFeatureStoreEnrichmentHandler = None # type: ignore
[docs]
@beam.ptransform.ptransform_fn
def enrichment_transform(
pcoll,
enrichment_handler: str,
handler_config: Dict[str, Any],
timeout: Optional[float] = 30):
# pylint: disable=line-too-long
"""
The Enrichment transform allows one to dynamically enhance elements in a
pipeline by performing key-value lookups against external services like
APIs or databases.
Example using BigTable: ::
- type: Enrichment
config:
enrichment_handler: 'BigTable'
handler_config:
project_id: 'apache-beam-testing'
instance_id: 'beam-test'
table_id: 'bigtable-enrichment-test'
row_key: 'product_id'
timeout: 30
For more information on Enrichment, see the [Beam docs](
https://beam.apache.org/documentation/transforms/python/elementwise/enrichment/).
Args:
enrichment_handler (str): Specifies the source from where data needs
to be extracted into the pipeline for enriching data. One of
"BigQuery", "BigTable", "FeastFeatureStore" or "VertexAIFeatureStore".
handler_config (str): Specifies the parameters for the respective
enrichment_handler in a YAML/JSON format. To see the full set of
handler_config parameters, see their corresponding doc pages:
- [BigQueryEnrichmentHandler](https://beam.apache.org/releases/pydoc/current/apache_beam.transforms.enrichment_handlers.bigquery.html#apache_beam.transforms.enrichment_handlers.bigquery.BigQueryEnrichmentHandler)
- [BigTableEnrichmentHandler](https://beam.apache.org/releases/pydoc/current/apache_beam.transforms.enrichment_handlers.bigtable.html#apache_beam.transforms.enrichment_handlers.bigtable.BigTableEnrichmentHandler)
- [FeastFeatureStoreEnrichmentHandler](https://beam.apache.org/releases/pydoc/current/apache_beam.transforms.enrichment_handlers.feast_feature_store.html#apache_beam.transforms.enrichment_handlers.feast_feature_store.FeastFeatureStoreEnrichmentHandler)
- [VertexAIFeatureStoreEnrichmentHandler](https://beam.apache.org/releases/pydoc/current/apache_beam.transforms.enrichment_handlers.vertex_ai_feature_store.html#apache_beam.transforms.enrichment_handlers.vertex_ai_feature_store.VertexAIFeatureStoreEnrichmentHandler)
timeout (float): Timeout for source requests in seconds. Defaults to 30
seconds.
"""
options.YamlOptions.check_enabled(pcoll.pipeline, 'Enrichment')
if not Enrichment:
raise ValueError(
f"gcp dependencies not installed. Cannot use {enrichment_handler} "
f"handler. Please install using 'pip install apache-beam[gcp]'.")
if (enrichment_handler == 'FeastFeatureStore' and
not FeastFeatureStoreEnrichmentHandler):
raise ValueError(
"FeastFeatureStore handler requires 'feast' package to be installed. " +
"Please install using 'pip install feast[gcp]' and try again.")
handler_map = {
'BigQuery': BigQueryEnrichmentHandler,
'BigTable': BigTableEnrichmentHandler,
'FeastFeatureStore': FeastFeatureStoreEnrichmentHandler,
'VertexAIFeatureStore': VertexAIFeatureStoreEnrichmentHandler
}
if enrichment_handler not in handler_map:
raise ValueError(f"Unknown enrichment source: {enrichment_handler}")
handler = handler_map[enrichment_handler](**handler_config)
return pcoll | Enrichment(source_handler=handler, timeout=timeout)