Source code for apache_beam.ml.gcp.naturallanguageml

#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

from typing import Mapping
from typing import Optional
from typing import Sequence
from typing import Tuple
from typing import Union

import apache_beam as beam
from apache_beam.metrics import Metrics

try:
  from google.cloud import language
  from google.cloud import language_v1
except ImportError:
  raise ImportError(
      'Google Cloud Natural Language API not supported for this execution '
      'environment (could not import Natural Language API client).')

__all__ = ['Document', 'AnnotateText']


[docs]class Document(object): """Represents the input to :class:`AnnotateText` transform. Args: content (str): The content of the input or the Google Cloud Storage URI where the file is stored. type (`Union[str, google.cloud.language_v1.Document.Type]`): Text type. Possible values are `HTML`, `PLAIN_TEXT`. The default value is `PLAIN_TEXT`. language_hint (`Optional[str]`): The language of the text. If not specified, language will be automatically detected. Values should conform to ISO-639-1 standard. encoding (`Optional[str]`): Text encoding. Possible values are: `NONE`, `UTF8`, `UTF16`, `UTF32`. The default value is `UTF8`. from_gcs (bool): Whether the content should be interpret as a Google Cloud Storage URI. The default value is :data:`False`. """ def __init__( self, content, # type: str type='PLAIN_TEXT', # type: Union[str, language_v1.Document.Type] language_hint=None, # type: Optional[str] encoding='UTF8', # type: Optional[str] from_gcs=False # type: bool ): self.content = content self.type = type self.encoding = encoding self.language_hint = language_hint self.from_gcs = from_gcs
[docs] @staticmethod def to_dict(document): # type: (Document) -> Mapping[str, Optional[str]] if document.from_gcs: dict_repr = {'gcs_content_uri': document.content} else: dict_repr = {'content': document.content} dict_repr.update({ 'type': document.type, 'language': document.language_hint }) return dict_repr
[docs]@beam.ptransform_fn def AnnotateText( pcoll, # type: beam.pvalue.PCollection features, # type: Union[Mapping[str, bool], language_v1.AnnotateTextRequest.Features] timeout=None, # type: Optional[float] metadata=None # type: Optional[Sequence[Tuple[str, str]]] ): """A :class:`~apache_beam.transforms.ptransform.PTransform` for annotating text using the Google Cloud Natural Language API: https://cloud.google.com/natural-language/docs. Args: pcoll (:class:`~apache_beam.pvalue.PCollection`): An input PCollection of :class:`Document` objects. features (`Union[Mapping[str, bool], types.AnnotateTextRequest.Features]`): A dictionary of natural language operations to be performed on given text in the following format:: {'extact_syntax'=True, 'extract_entities'=True} timeout (`Optional[float]`): The amount of time, in seconds, to wait for the request to complete. The timeout applies to each individual retry attempt. metadata (`Optional[Sequence[Tuple[str, str]]]`): Additional metadata that is provided to the method. """ return pcoll | beam.ParDo(_AnnotateTextFn(features, timeout, metadata))
@beam.typehints.with_input_types(Document) @beam.typehints.with_output_types(language_v1.AnnotateTextResponse) class _AnnotateTextFn(beam.DoFn): def __init__( self, features, # type: Union[Mapping[str, bool], language_v1.AnnotateTextRequest.Features] timeout, # type: Optional[float] metadata=None # type: Optional[Sequence[Tuple[str, str]]] ): self.features = features self.timeout = timeout self.metadata = metadata self.api_calls = Metrics.counter(self.__class__.__name__, 'api_calls') self.client = None def setup(self): self.client = self._get_api_client() @staticmethod def _get_api_client(): # type: () -> language.LanguageServiceClient return language.LanguageServiceClient() def process(self, element): response = self.client.annotate_text( document=Document.to_dict(element), features=self.features, encoding_type=element.encoding, timeout=self.timeout, metadata=self.metadata) self.api_calls.inc() yield response