#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
from typing import Mapping
from typing import Optional
from typing import Sequence
from typing import Tuple
from typing import Union
import apache_beam as beam
from apache_beam.metrics import Metrics
try:
from google.cloud import language
from google.cloud import language_v1
except ImportError:
raise ImportError(
'Google Cloud Natural Language API not supported for this execution '
'environment (could not import Natural Language API client).')
__all__ = ['Document', 'AnnotateText']
[docs]class Document(object):
"""Represents the input to :class:`AnnotateText` transform.
Args:
content (str): The content of the input or the Google Cloud Storage URI
where the file is stored.
type (`Union[str, google.cloud.language_v1.Document.Type]`): Text type.
Possible values are `HTML`, `PLAIN_TEXT`. The default value is
`PLAIN_TEXT`.
language_hint (`Optional[str]`): The language of the text. If not specified,
language will be automatically detected. Values should conform to
ISO-639-1 standard.
encoding (`Optional[str]`): Text encoding. Possible values are: `NONE`,
`UTF8`, `UTF16`, `UTF32`. The default value is `UTF8`.
from_gcs (bool): Whether the content should be interpret as a Google Cloud
Storage URI. The default value is :data:`False`.
"""
def __init__(
self,
content, # type: str
type='PLAIN_TEXT', # type: Union[str, language_v1.Document.Type]
language_hint=None, # type: Optional[str]
encoding='UTF8', # type: Optional[str]
from_gcs=False # type: bool
):
self.content = content
self.type = type
self.encoding = encoding
self.language_hint = language_hint
self.from_gcs = from_gcs
[docs] @staticmethod
def to_dict(document):
# type: (Document) -> Mapping[str, Optional[str]]
if document.from_gcs:
dict_repr = {'gcs_content_uri': document.content}
else:
dict_repr = {'content': document.content}
dict_repr.update({
'type': document.type, 'language': document.language_hint
})
return dict_repr
[docs]@beam.ptransform_fn
def AnnotateText(
pcoll, # type: beam.pvalue.PCollection
features, # type: Union[Mapping[str, bool], language_v1.AnnotateTextRequest.Features]
timeout=None, # type: Optional[float]
metadata=None # type: Optional[Sequence[Tuple[str, str]]]
):
"""A :class:`~apache_beam.transforms.ptransform.PTransform`
for annotating text using the Google Cloud Natural Language API:
https://cloud.google.com/natural-language/docs.
Args:
pcoll (:class:`~apache_beam.pvalue.PCollection`): An input PCollection of
:class:`Document` objects.
features (`Union[Mapping[str, bool], types.AnnotateTextRequest.Features]`):
A dictionary of natural language operations to be performed on given
text in the following format::
{'extact_syntax'=True, 'extract_entities'=True}
timeout (`Optional[float]`): The amount of time, in seconds, to wait
for the request to complete. The timeout applies to each individual
retry attempt.
metadata (`Optional[Sequence[Tuple[str, str]]]`): Additional metadata
that is provided to the method.
"""
return pcoll | beam.ParDo(_AnnotateTextFn(features, timeout, metadata))
@beam.typehints.with_input_types(Document)
@beam.typehints.with_output_types(language_v1.AnnotateTextResponse)
class _AnnotateTextFn(beam.DoFn):
def __init__(
self,
features, # type: Union[Mapping[str, bool], language_v1.AnnotateTextRequest.Features]
timeout, # type: Optional[float]
metadata=None # type: Optional[Sequence[Tuple[str, str]]]
):
self.features = features
self.timeout = timeout
self.metadata = metadata
self.api_calls = Metrics.counter(self.__class__.__name__, 'api_calls')
self.client = None
def setup(self):
self.client = self._get_api_client()
@staticmethod
def _get_api_client():
# type: () -> language.LanguageServiceClient
return language.LanguageServiceClient()
def process(self, element):
response = self.client.annotate_text(
document=Document.to_dict(element),
features=self.features,
encoding_type=element.encoding,
timeout=self.timeout,
metadata=self.metadata)
self.api_calls.inc()
yield response