#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
"""A connector for sending API requests to the GCP Video Intelligence API."""
from __future__ import absolute_import
from typing import Optional
from typing import Tuple
from typing import Union
from future.utils import binary_type
from future.utils import text_type
from apache_beam import typehints
from apache_beam.metrics import Metrics
from apache_beam.transforms import DoFn
from apache_beam.transforms import ParDo
from apache_beam.transforms import PTransform
from cachetools.func import ttl_cache
try:
  from google.cloud import videointelligence
except ImportError:
  raise ImportError(
      'Google Cloud Video Intelligence not supported for this execution '
      'environment (could not import google.cloud.videointelligence).')
__all__ = ['AnnotateVideo', 'AnnotateVideoWithContext']
@ttl_cache(maxsize=128, ttl=3600)
def get_videointelligence_client():
  """Returns a Cloud Video Intelligence client."""
  _client = videointelligence.VideoIntelligenceServiceClient()
  return _client
[docs]class AnnotateVideo(PTransform):
  """A ``PTransform`` for annotating video using the GCP Video Intelligence API
  ref: https://cloud.google.com/video-intelligence/docs
  Sends each element to the GCP Video Intelligence API. Element is a
  Union[text_type, binary_type] of either an URI (e.g. a GCS URI) or
  binary_type base64-encoded video data.
  Accepts an `AsDict` side input that maps each video to a video context.
  """
  def __init__(
      self,
      features,
      location_id=None,
      metadata=None,
      timeout=120,
      context_side_input=None):
    """
    Args:
      features: (List[``videointelligence_v1.enums.Feature``]) Required.
        The Video Intelligence API features to detect
      location_id: (str) Optional.
        Cloud region where annotation should take place.
        If no region is specified, a region will be determined
        based on video file location.
      metadata: (Sequence[Tuple[str, str]]) Optional.
        Additional metadata that is provided to the method.
      timeout: (int) Optional.
        The time in seconds to wait for the response from the
        Video Intelligence API
      context_side_input: (beam.pvalue.AsDict) Optional.
        An ``AsDict`` of a PCollection to be passed to the
        _VideoAnnotateFn as the video context mapping containing additional
        video context and/or feature-specific parameters.
        Example usage::
          video_contexts =
            [('gs://cloud-samples-data/video/cat.mp4', Union[dict,
            ``videointelligence_v1.types.VideoContext``]),
            ('gs://some-other-video/sample.mp4', Union[dict,
            ``videointelligence_v1.types.VideoContext``]),]
          context_side_input =
            (
              p
              | "Video contexts" >> beam.Create(video_contexts)
            )
          videointelligenceml.AnnotateVideo(features,
            context_side_input=beam.pvalue.AsDict(context_side_input)))
    """
    super(AnnotateVideo, self).__init__()
    self.features = features
    self.location_id = location_id
    self.metadata = metadata
    self.timeout = timeout
    self.context_side_input = context_side_input
[docs]  def expand(self, pvalue):
    return pvalue | ParDo(
        _VideoAnnotateFn(
            features=self.features,
            location_id=self.location_id,
            metadata=self.metadata,
            timeout=self.timeout),
        context_side_input=self.context_side_input)  
@typehints.with_input_types(
    Union[text_type, binary_type],
    Optional[videointelligence.types.VideoContext])
class _VideoAnnotateFn(DoFn):
  """A DoFn that sends each input element to the GCP Video Intelligence API
  service and outputs an element with the return result of the API
  (``google.cloud.videointelligence_v1.types.AnnotateVideoResponse``).
  """
  def __init__(self, features, location_id, metadata, timeout):
    super(_VideoAnnotateFn, self).__init__()
    self._client = None
    self.features = features
    self.location_id = location_id
    self.metadata = metadata
    self.timeout = timeout
    self.counter = Metrics.counter(self.__class__, "API Calls")
  def start_bundle(self):
    self._client = get_videointelligence_client()
  def _annotate_video(self, element, video_context):
    if isinstance(element, text_type):  # Is element an URI to a GCS bucket
      response = self._client.annotate_video(
          input_uri=element,
          features=self.features,
          video_context=video_context,
          location_id=self.location_id,
          metadata=self.metadata)
    else:  # Is element raw bytes
      response = self._client.annotate_video(
          input_content=element,
          features=self.features,
          video_context=video_context,
          location_id=self.location_id,
          metadata=self.metadata)
    return response
  def process(self, element, context_side_input=None, *args, **kwargs):
    if context_side_input:  # If we have a side input video context, use that
      video_context = context_side_input.get(element)
    else:
      video_context = None
    response = self._annotate_video(element, video_context)
    self.counter.inc()
    yield response.result(timeout=self.timeout)
[docs]class AnnotateVideoWithContext(AnnotateVideo):
  """A ``PTransform`` for annotating video using the GCP Video Intelligence API
  ref: https://cloud.google.com/video-intelligence/docs
  Sends each element to the GCP Video Intelligence API.
  Element is a tuple of
    (Union[text_type, binary_type],
    Optional[videointelligence.types.VideoContext])
  where the former is either an URI (e.g. a GCS URI) or
  binary_type base64-encoded video data
  """
  def __init__(self, features, location_id=None, metadata=None, timeout=120):
    """
      Args:
        features: (List[``videointelligence_v1.enums.Feature``]) Required.
          the Video Intelligence API features to detect
        location_id: (str) Optional.
          Cloud region where annotation should take place.
          If no region is specified, a region will be determined
          based on video file location.
        metadata: (Sequence[Tuple[str, str]]) Optional.
          Additional metadata that is provided to the method.
        timeout: (int) Optional.
          The time in seconds to wait for the response from the
          Video Intelligence API
    """
    super(AnnotateVideoWithContext, self).__init__(
        features=features,
        location_id=location_id,
        metadata=metadata,
        timeout=timeout)
[docs]  def expand(self, pvalue):
    return pvalue | ParDo(
        _VideoAnnotateFnWithContext(
            features=self.features,
            location_id=self.location_id,
            metadata=self.metadata,
            timeout=self.timeout))  
@typehints.with_input_types(
    Tuple[Union[text_type, binary_type],
          Optional[videointelligence.types.VideoContext]])
class _VideoAnnotateFnWithContext(_VideoAnnotateFn):
  """A DoFn that unpacks each input tuple to element, video_context variables
  and sends these to the GCP Video Intelligence API service and outputs
  an element with the return result of the API
  (``google.cloud.videointelligence_v1.types.AnnotateVideoResponse``).
  """
  def __init__(self, features, location_id, metadata, timeout):
    super(_VideoAnnotateFnWithContext, self).__init__(
        features=features,
        location_id=location_id,
        metadata=metadata,
        timeout=timeout)
  def process(self, element, *args, **kwargs):
    element, video_context = element  # Unpack (video, video_context) tuple
    response = self._annotate_video(element, video_context)
    self.counter.inc()
    yield response.result(timeout=self.timeout)