Enrichment with Google Cloud Vertex AI Feature Store
|
In Apache Beam 2.55.0 and later versions, the enrichment transform includes a built-in enrichment handler for Vertex AI Feature Store.
The following example demonstrates how to create a pipeline that use the enrichment transform with the VertexAIFeatureStoreEnrichmentHandler
handler and the VertexAIFeatureStoreLegacyEnrichmentHandler
handler.
Example 1: Enrichment with Vertex AI Feature Store
The precomputed feature values stored in Vertex AI Feature Store uses the following format:
user_id | age | gender | state | country |
---|---|---|---|---|
21422 | 12 | 0 | 0 | 0 |
2963 | 12 | 1 | 1 | 1 |
20592 | 12 | 1 | 2 | 2 |
76538 | 12 | 1 | 3 | 0 |
import apache_beam as beam
from apache_beam.transforms.enrichment import Enrichment
from apache_beam.transforms.enrichment_handlers.vertex_ai_feature_store \
import VertexAIFeatureStoreEnrichmentHandler
project_id = 'apache-beam-testing'
location = 'us-central1'
api_endpoint = f"{location}-aiplatform.googleapis.com"
data = [
beam.Row(user_id='2963', product_id=14235, sale_price=15.0),
beam.Row(user_id='21422', product_id=11203, sale_price=12.0),
beam.Row(user_id='20592', product_id=8579, sale_price=9.0),
]
vertex_ai_handler = VertexAIFeatureStoreEnrichmentHandler(
project=project_id,
location=location,
api_endpoint=api_endpoint,
feature_store_name="vertexai_enrichment_example",
feature_view_name="users",
row_key="user_id",
)
with beam.Pipeline() as p:
_ = (
p
| "Create" >> beam.Create(data)
| "Enrich W/ Vertex AI" >> Enrichment(vertex_ai_handler)
| "Print" >> beam.Map(print))
Output:
Row(user_id='2963', product_id=14235, sale_price=15.0, age=12.0, state='1', gender='1', country='1')
Row(user_id='21422', product_id=11203, sale_price=12.0, age=12.0, state='0', gender='0', country='0')
Row(user_id='20592', product_id=8579, sale_price=9.0, age=12.0, state='2', gender='1', country='2')
Example 2: Enrichment with Vertex AI Feature Store (legacy)
The precomputed feature values stored in Vertex AI Feature Store (Legacy) use the following format:
entity_id | title | genres |
---|---|---|
movie_01 | The Shawshank Redemption | Drama |
movie_02 | The Shining | Horror |
movie_04 | The Dark Knight | Action |
import apache_beam as beam
from apache_beam.transforms.enrichment import Enrichment
from apache_beam.transforms.enrichment_handlers.vertex_ai_feature_store \
import VertexAIFeatureStoreLegacyEnrichmentHandler
project_id = 'apache-beam-testing'
location = 'us-central1'
api_endpoint = f"{location}-aiplatform.googleapis.com"
data = [
beam.Row(entity_id="movie_01", title='The Shawshank Redemption'),
beam.Row(entity_id="movie_02", title="The Shining"),
beam.Row(entity_id="movie_04", title='The Dark Knight'),
]
vertex_ai_handler = VertexAIFeatureStoreLegacyEnrichmentHandler(
project=project_id,
location=location,
api_endpoint=api_endpoint,
entity_type_id='movies',
feature_store_id="movie_prediction_unique",
feature_ids=["title", "genres"],
row_key="entity_id",
)
with beam.Pipeline() as p:
_ = (
p
| "Create" >> beam.Create(data)
| "Enrich W/ Vertex AI" >> Enrichment(vertex_ai_handler)
| "Print" >> beam.Map(print))
Output:
Related transforms
Not applicable.
Pydoc |
Last updated on 2025/01/19
Have you found everything you were looking for?
Was it all useful and clear? Is there anything that you would like to change? Let us know!