Source code for apache_beam.ml.gcp.recommendations_ai

#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

"""A connector for sending API requests to the GCP Recommendations AI
API (https://cloud.google.com/recommendations).
"""

from __future__ import absolute_import

from typing import Sequence
from typing import Tuple

from google.api_core.retry import Retry

from apache_beam import pvalue
from apache_beam.metrics import Metrics
from apache_beam.options.pipeline_options import GoogleCloudOptions
from apache_beam.transforms import DoFn
from apache_beam.transforms import ParDo
from apache_beam.transforms import PTransform
from apache_beam.transforms.util import GroupIntoBatches
from cachetools.func import ttl_cache

# pylint: disable=wrong-import-order, wrong-import-position, ungrouped-imports
try:
  from google.cloud import recommendationengine
except ImportError:
  raise ImportError(
      'Google Cloud Recommendation AI not supported for this execution '
      'environment (could not import google.cloud.recommendationengine).')
# pylint: enable=wrong-import-order, wrong-import-position, ungrouped-imports

__all__ = [
    'CreateCatalogItem',
    'WriteUserEvent',
    'ImportCatalogItems',
    'ImportUserEvents',
    'PredictUserEvent'
]

FAILED_CATALOG_ITEMS = "failed_catalog_items"


@ttl_cache(maxsize=128, ttl=3600)
def get_recommendation_prediction_client():
  """Returns a Recommendation AI - Prediction Service client."""
  _client = recommendationengine.PredictionServiceClient()
  return _client


@ttl_cache(maxsize=128, ttl=3600)
def get_recommendation_catalog_client():
  """Returns a Recommendation AI - Catalog Service client."""
  _client = recommendationengine.CatalogServiceClient()
  return _client


@ttl_cache(maxsize=128, ttl=3600)
def get_recommendation_user_event_client():
  """Returns a Recommendation AI - UserEvent Service client."""
  _client = recommendationengine.UserEventServiceClient()
  return _client


[docs]class CreateCatalogItem(PTransform): """Creates catalogitem information. The ``PTranform`` returns a PCollectionTuple with a PCollections of successfully and failed created CatalogItems. Example usage:: pipeline | CreateCatalogItem( project='example-gcp-project', catalog_name='my-catalog') """ def __init__( self, project: str = None, retry: Retry = None, timeout: float = 120, metadata: Sequence[Tuple[str, str]] = (), catalog_name: str = "default_catalog"): """Initializes a :class:`CreateCatalogItem` transform. Args: project (str): Optional. GCP project name in which the catalog data will be imported. retry: Optional. Designation of what errors, if any, should be retried. timeout (float): Optional. The amount of time, in seconds, to wait for the request to complete. metadata: Optional. Strings which should be sent along with the request as metadata. catalog_name (str): Optional. Name of the catalog. Default: 'default_catalog' """ self.project = project self.retry = retry self.timeout = timeout self.metadata = metadata self.catalog_name = catalog_name
[docs] def expand(self, pcoll): if self.project is None: self.project = pcoll.pipeline.options.view_as(GoogleCloudOptions).project if self.project is None: raise ValueError( """GCP project name needs to be specified in "project" pipeline option""") return pcoll | ParDo( _CreateCatalogItemFn( self.project, self.retry, self.timeout, self.metadata, self.catalog_name))
class _CreateCatalogItemFn(DoFn): def __init__( self, project: str = None, retry: Retry = None, timeout: float = 120, metadata: Sequence[Tuple[str, str]] = (), catalog_name: str = None): self._client = None self.retry = retry self.timeout = timeout self.metadata = metadata self.parent = f"projects/{project}/locations/global/catalogs/{catalog_name}" self.counter = Metrics.counter(self.__class__, "api_calls") def setup(self): if self._client is None: self._client = get_recommendation_catalog_client() def process(self, element): catalog_item = recommendationengine.CatalogItem(element) request = recommendationengine.CreateCatalogItemRequest( parent=self.parent, catalog_item=catalog_item) try: created_catalog_item = self._client.create_catalog_item( request=request, retry=self.retry, timeout=self.timeout, metadata=self.metadata) self.counter.inc() yield recommendationengine.CatalogItem.to_dict(created_catalog_item) except Exception: yield pvalue.TaggedOutput( FAILED_CATALOG_ITEMS, recommendationengine.CatalogItem.to_dict(catalog_item))
[docs]class ImportCatalogItems(PTransform): """Imports catalogitems in bulk. The `PTransform` returns a PCollectionTuple with PCollections of successfully and failed imported CatalogItems. Example usage:: pipeline | ImportCatalogItems( project='example-gcp-project', catalog_name='my-catalog') """ def __init__( self, max_batch_size: int = 5000, project: str = None, retry: Retry = None, timeout: float = 120, metadata: Sequence[Tuple[str, str]] = (), catalog_name: str = "default_catalog"): """Initializes a :class:`ImportCatalogItems` transform Args: batch_size (int): Required. Maximum number of catalogitems per request. project (str): Optional. GCP project name in which the catalog data will be imported. retry: Optional. Designation of what errors, if any, should be retried. timeout (float): Optional. The amount of time, in seconds, to wait for the request to complete. metadata: Optional. Strings which should be sent along with the request as metadata. catalog_name (str): Optional. Name of the catalog. Default: 'default_catalog' """ self.max_batch_size = max_batch_size self.project = project self.retry = retry self.timeout = timeout self.metadata = metadata self.catalog_name = catalog_name
[docs] def expand(self, pcoll): if self.project is None: self.project = pcoll.pipeline.options.view_as(GoogleCloudOptions).project if self.project is None: raise ValueError( 'GCP project name needs to be specified in "project" pipeline option') return ( pcoll | GroupIntoBatches.WithShardedKey(self.max_batch_size) | ParDo( _ImportCatalogItemsFn( self.project, self.retry, self.timeout, self.metadata, self.catalog_name)))
class _ImportCatalogItemsFn(DoFn): def __init__( self, project=None, retry=None, timeout=120, metadata=None, catalog_name=None): self._client = None self.retry = retry self.timeout = timeout self.metadata = metadata self.parent = f"projects/{project}/locations/global/catalogs/{catalog_name}" self.counter = Metrics.counter(self.__class__, "api_calls") def setup(self): if self._client is None: self.client = get_recommendation_catalog_client() def process(self, element): catalog_items = [recommendationengine.CatalogItem(e) for e in element[1]] catalog_inline_source = recommendationengine.CatalogInlineSource( {"catalog_items": catalog_items}) input_config = recommendationengine.InputConfig( catalog_inline_source=catalog_inline_source) request = recommendationengine.ImportCatalogItemsRequest( parent=self.parent, input_config=input_config) try: operation = self._client.import_catalog_items( request=request, retry=self.retry, timeout=self.timeout, metadata=self.metadata) self.counter.inc(len(catalog_items)) yield operation.result() except Exception: yield pvalue.TaggedOutput(FAILED_CATALOG_ITEMS, catalog_items)
[docs]class WriteUserEvent(PTransform): """Write user event information. The `PTransform` returns a PCollectionTuple with PCollections of successfully and failed written UserEvents. Example usage:: pipeline | WriteUserEvent( project='example-gcp-project', catalog_name='my-catalog', event_store='my_event_store') """ def __init__( self, project: str = None, retry: Retry = None, timeout: float = 120, metadata: Sequence[Tuple[str, str]] = (), catalog_name: str = "default_catalog", event_store: str = "default_event_store"): """Initializes a :class:`WriteUserEvent` transform. Args: project (str): Optional. GCP project name in which the catalog data will be imported. retry: Optional. Designation of what errors, if any, should be retried. timeout (float): Optional. The amount of time, in seconds, to wait for the request to complete. metadata: Optional. Strings which should be sent along with the request as metadata. catalog_name (str): Optional. Name of the catalog. Default: 'default_catalog' event_store (str): Optional. Name of the event store. Default: 'default_event_store' """ self.project = project self.retry = retry self.timeout = timeout self.metadata = metadata self.catalog_name = catalog_name self.event_store = event_store
[docs] def expand(self, pcoll): if self.project is None: self.project = pcoll.pipeline.options.view_as(GoogleCloudOptions).project if self.project is None: raise ValueError( 'GCP project name needs to be specified in "project" pipeline option') return pcoll | ParDo( _WriteUserEventFn( self.project, self.retry, self.timeout, self.metadata, self.catalog_name, self.event_store))
class _WriteUserEventFn(DoFn): FAILED_USER_EVENTS = "failed_user_events" def __init__( self, project=None, retry=None, timeout=120, metadata=None, catalog_name=None, event_store=None): self._client = None self.retry = retry self.timeout = timeout self.metadata = metadata self.parent = f"projects/{project}/locations/global/catalogs/"\ f"{catalog_name}/eventStores/{event_store}" self.counter = Metrics.counter(self.__class__, "api_calls") def setup(self): if self._client is None: self._client = get_recommendation_user_event_client() def process(self, element): user_event = recommendationengine.UserEvent(element) request = recommendationengine.WriteUserEventRequest( parent=self.parent, user_event=user_event) try: created_user_event = self._client.write_user_event(request) self.counter.inc() yield recommendationengine.UserEvent.to_dict(created_user_event) except Exception: yield pvalue.TaggedOutput( self.FAILED_USER_EVENTS, recommendationengine.UserEvent.to_dict(user_event))
[docs]class ImportUserEvents(PTransform): """Imports userevents in bulk. The `PTransform` returns a PCollectionTuple with PCollections of successfully and failed imported UserEvents. Example usage:: pipeline | ImportUserEvents( project='example-gcp-project', catalog_name='my-catalog', event_store='my_event_store') """ def __init__( self, max_batch_size: int = 5000, project: str = None, retry: Retry = None, timeout: float = 120, metadata: Sequence[Tuple[str, str]] = (), catalog_name: str = "default_catalog", event_store: str = "default_event_store"): """Initializes a :class:`WriteUserEvent` transform. Args: batch_size (int): Required. Maximum number of catalogitems per request. project (str): Optional. GCP project name in which the catalog data will be imported. retry: Optional. Designation of what errors, if any, should be retried. timeout (float): Optional. The amount of time, in seconds, to wait for the request to complete. metadata: Optional. Strings which should be sent along with the request as metadata. catalog_name (str): Optional. Name of the catalog. Default: 'default_catalog' event_store (str): Optional. Name of the event store. Default: 'default_event_store' """ self.max_batch_size = max_batch_size self.project = project self.retry = retry self.timeout = timeout self.metadata = metadata self.catalog_name = catalog_name self.event_store = event_store
[docs] def expand(self, pcoll): if self.project is None: self.project = pcoll.pipeline.options.view_as(GoogleCloudOptions).project if self.project is None: raise ValueError( 'GCP project name needs to be specified in "project" pipeline option') return ( pcoll | GroupIntoBatches.WithShardedKey(self.max_batch_size) | ParDo( _ImportUserEventsFn( self.project, self.retry, self.timeout, self.metadata, self.catalog_name, self.event_store)))
class _ImportUserEventsFn(DoFn): FAILED_USER_EVENTS = "failed_user_events" def __init__( self, project=None, retry=None, timeout=120, metadata=None, catalog_name=None, event_store=None): self._client = None self.retry = retry self.timeout = timeout self.metadata = metadata self.parent = f"projects/{project}/locations/global/catalogs/"\ f"{catalog_name}/eventStores/{event_store}" self.counter = Metrics.counter(self.__class__, "api_calls") def setup(self): if self._client is None: self.client = get_recommendation_user_event_client() def process(self, element): user_events = [recommendationengine.UserEvent(e) for e in element[1]] user_event_inline_source = recommendationengine.UserEventInlineSource( {"user_events": user_events}) input_config = recommendationengine.InputConfig( user_event_inline_source=user_event_inline_source) request = recommendationengine.ImportUserEventsRequest( parent=self.parent, input_config=input_config) try: operation = self._client.write_user_event(request) self.counter.inc(len(user_events)) yield recommendationengine.PredictResponse.to_dict(operation.result()) except Exception: yield pvalue.TaggedOutput(self.FAILED_USER_EVENTS, user_events)
[docs]class PredictUserEvent(PTransform): """Make a recommendation prediction. The `PTransform` returns a PCollection Example usage:: pipeline | PredictUserEvent( project='example-gcp-project', catalog_name='my-catalog', event_store='my_event_store', placement_id='recently_viewed_default') """ def __init__( self, project: str = None, retry: Retry = None, timeout: float = 120, metadata: Sequence[Tuple[str, str]] = (), catalog_name: str = "default_catalog", event_store: str = "default_event_store", placement_id: str = None): """Initializes a :class:`PredictUserEvent` transform. Args: project (str): Optional. GCP project name in which the catalog data will be imported. retry: Optional. Designation of what errors, if any, should be retried. timeout (float): Optional. The amount of time, in seconds, to wait for the request to complete. metadata: Optional. Strings which should be sent along with the request as metadata. catalog_name (str): Optional. Name of the catalog. Default: 'default_catalog' event_store (str): Optional. Name of the event store. Default: 'default_event_store' placement_id (str): Required. ID of the recommendation engine placement. This id is used to identify the set of models that will be used to make the prediction. """ self.project = project self.retry = retry self.timeout = timeout self.metadata = metadata self.placement_id = placement_id self.catalog_name = catalog_name self.event_store = event_store if placement_id is None: raise ValueError('placement_id must be specified') else: self.placement_id = placement_id
[docs] def expand(self, pcoll): if self.project is None: self.project = pcoll.pipeline.options.view_as(GoogleCloudOptions).project if self.project is None: raise ValueError( 'GCP project name needs to be specified in "project" pipeline option') return pcoll | ParDo( _PredictUserEventFn( self.project, self.retry, self.timeout, self.metadata, self.catalog_name, self.event_store, self.placement_id))
class _PredictUserEventFn(DoFn): FAILED_PREDICTIONS = "failed_predictions" def __init__( self, project=None, retry=None, timeout=120, metadata=None, catalog_name=None, event_store=None, placement_id=None): self._client = None self.retry = retry self.timeout = timeout self.metadata = metadata self.name = f"projects/{project}/locations/global/catalogs/"\ f"{catalog_name}/eventStores/{event_store}/placements/"\ f"{placement_id}" self.counter = Metrics.counter(self.__class__, "api_calls") def setup(self): if self._client is None: self._client = get_recommendation_prediction_client() def process(self, element): user_event = recommendationengine.UserEvent(element) request = recommendationengine.PredictRequest( name=self.name, user_event=user_event) try: prediction = self._client.predict(request) self.counter.inc() yield [ recommendationengine.PredictResponse.to_dict(p) for p in prediction.pages ] except Exception: yield pvalue.TaggedOutput(self.FAILED_PREDICTIONS, user_event)