#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
"""A connector for sending API requests to the GCP Video Intelligence API."""
from typing import Optional
from typing import Tuple
from typing import Union
from apache_beam import typehints
from apache_beam.metrics import Metrics
from apache_beam.transforms import DoFn
from apache_beam.transforms import ParDo
from apache_beam.transforms import PTransform
from cachetools.func import ttl_cache
try:
from google.cloud import videointelligence
except ImportError:
raise ImportError(
'Google Cloud Video Intelligence not supported for this execution '
'environment (could not import google.cloud.videointelligence).')
__all__ = ['AnnotateVideo', 'AnnotateVideoWithContext']
@ttl_cache(maxsize=128, ttl=3600)
def get_videointelligence_client():
"""Returns a Cloud Video Intelligence client."""
_client = videointelligence.VideoIntelligenceServiceClient()
return _client
[docs]class AnnotateVideo(PTransform):
"""A ``PTransform`` for annotating video using the GCP Video Intelligence API
ref: https://cloud.google.com/video-intelligence/docs
Sends each element to the GCP Video Intelligence API. Element is a
Union[str, bytes] of either an URI (e.g. a GCS URI) or
bytes base64-encoded video data.
Accepts an `AsDict` side input that maps each video to a video context.
"""
def __init__(
self,
features,
location_id=None,
metadata=None,
timeout=120,
context_side_input=None):
"""
Args:
features: (List[``videointelligence_v1.enums.Feature``]) Required.
The Video Intelligence API features to detect
location_id: (str) Optional.
Cloud region where annotation should take place.
If no region is specified, a region will be determined
based on video file location.
metadata: (Sequence[Tuple[str, str]]) Optional.
Additional metadata that is provided to the method.
timeout: (int) Optional.
The time in seconds to wait for the response from the
Video Intelligence API
context_side_input: (beam.pvalue.AsDict) Optional.
An ``AsDict`` of a PCollection to be passed to the
_VideoAnnotateFn as the video context mapping containing additional
video context and/or feature-specific parameters.
Example usage::
video_contexts =
[('gs://cloud-samples-data/video/cat.mp4', Union[dict,
``videointelligence_v1.types.VideoContext``]),
('gs://some-other-video/sample.mp4', Union[dict,
``videointelligence_v1.types.VideoContext``]),]
context_side_input =
(
p
| "Video contexts" >> beam.Create(video_contexts)
)
videointelligenceml.AnnotateVideo(features,
context_side_input=beam.pvalue.AsDict(context_side_input)))
"""
super().__init__()
self.features = features
self.location_id = location_id
self.metadata = metadata
self.timeout = timeout
self.context_side_input = context_side_input
[docs] def expand(self, pvalue):
return pvalue | ParDo(
_VideoAnnotateFn(
features=self.features,
location_id=self.location_id,
metadata=self.metadata,
timeout=self.timeout),
context_side_input=self.context_side_input)
@typehints.with_input_types(
Union[str, bytes], Optional[videointelligence.types.VideoContext])
class _VideoAnnotateFn(DoFn):
"""A DoFn that sends each input element to the GCP Video Intelligence API
service and outputs an element with the return result of the API
(``google.cloud.videointelligence_v1.types.AnnotateVideoResponse``).
"""
def __init__(self, features, location_id, metadata, timeout):
super().__init__()
self._client = None
self.features = features
self.location_id = location_id
self.metadata = metadata
self.timeout = timeout
self.counter = Metrics.counter(self.__class__, "API Calls")
def start_bundle(self):
self._client = get_videointelligence_client()
def _annotate_video(self, element, video_context):
if isinstance(element, str): # Is element an URI to a GCS bucket
response = self._client.annotate_video(
input_uri=element,
features=self.features,
video_context=video_context,
location_id=self.location_id,
metadata=self.metadata)
else: # Is element raw bytes
response = self._client.annotate_video(
input_content=element,
features=self.features,
video_context=video_context,
location_id=self.location_id,
metadata=self.metadata)
return response
def process(self, element, context_side_input=None, *args, **kwargs):
if context_side_input: # If we have a side input video context, use that
video_context = context_side_input.get(element)
else:
video_context = None
response = self._annotate_video(element, video_context)
self.counter.inc()
yield response.result(timeout=self.timeout)
[docs]class AnnotateVideoWithContext(AnnotateVideo):
"""A ``PTransform`` for annotating video using the GCP Video Intelligence API
ref: https://cloud.google.com/video-intelligence/docs
Sends each element to the GCP Video Intelligence API.
Element is a tuple of
(Union[str, bytes],
Optional[videointelligence.types.VideoContext])
where the former is either an URI (e.g. a GCS URI) or
bytes base64-encoded video data
"""
def __init__(self, features, location_id=None, metadata=None, timeout=120):
"""
Args:
features: (List[``videointelligence_v1.enums.Feature``]) Required.
the Video Intelligence API features to detect
location_id: (str) Optional.
Cloud region where annotation should take place.
If no region is specified, a region will be determined
based on video file location.
metadata: (Sequence[Tuple[str, str]]) Optional.
Additional metadata that is provided to the method.
timeout: (int) Optional.
The time in seconds to wait for the response from the
Video Intelligence API
"""
super().__init__(
features=features,
location_id=location_id,
metadata=metadata,
timeout=timeout)
[docs] def expand(self, pvalue):
return pvalue | ParDo(
_VideoAnnotateFnWithContext(
features=self.features,
location_id=self.location_id,
metadata=self.metadata,
timeout=self.timeout))
@typehints.with_input_types(
Tuple[Union[str, bytes], Optional[videointelligence.types.VideoContext]])
class _VideoAnnotateFnWithContext(_VideoAnnotateFn):
"""A DoFn that unpacks each input tuple to element, video_context variables
and sends these to the GCP Video Intelligence API service and outputs
an element with the return result of the API
(``google.cloud.videointelligence_v1.types.AnnotateVideoResponse``).
"""
def __init__(self, features, location_id, metadata, timeout):
super().__init__(
features=features,
location_id=location_id,
metadata=metadata,
timeout=timeout)
def process(self, element, *args, **kwargs):
element, video_context = element # Unpack (video, video_context) tuple
response = self._annotate_video(element, video_context)
self.counter.inc()
yield response.result(timeout=self.timeout)