#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
"""PipelineRunner, an abstract base runner object."""
# pytype: skip-file
import importlib
import logging
from typing import TYPE_CHECKING
from typing import Iterable
from typing import Optional
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import PortableOptions
from apache_beam.options.pipeline_options import SetupOptions
from apache_beam.options.pipeline_options import StandardOptions
from apache_beam.options.pipeline_options import TypeOptions
from apache_beam.portability import common_urns
from apache_beam.portability.api import beam_runner_api_pb2
from apache_beam.runners.common import group_by_key_input_visitor
from apache_beam.transforms import environments
if TYPE_CHECKING:
  from apache_beam import pvalue
  from apache_beam import PTransform
  from apache_beam.pipeline import Pipeline
__all__ = ['PipelineRunner', 'PipelineState', 'PipelineResult']
_RUNNER_MAP = {
    path.rsplit('.', maxsplit=1)[-1].lower(): path
    for path in StandardOptions.ALL_KNOWN_RUNNERS
}
# Allow this alias, but don't make public.
_RUNNER_MAP['pythonrpcdirectrunner'] = (
    'apache_beam.runners.experimental'
    '.python_rpc_direct.python_rpc_direct_runner.PythonRPCDirectRunner')
_LOGGER = logging.getLogger(__name__)
def create_runner(runner_name):
  # type: (str) -> PipelineRunner
  """For internal use only; no backwards-compatibility guarantees.
  Creates a runner instance from a runner class name.
  Args:
    runner_name: Name of the pipeline runner. Possible values are listed in
      _RUNNER_MAP above.
  Returns:
    A runner object.
  Raises:
    RuntimeError: if an invalid runner name is used.
  """
  # Get the qualified runner name by using the lower case runner name. If that
  # fails try appending the name with 'runner' and check if it matches.
  # If that also fails, use the given runner name as is.
  runner_name = _RUNNER_MAP.get(
      runner_name.lower(),
      _RUNNER_MAP.get(runner_name.lower() + 'runner', runner_name))
  if '.' in runner_name:
    module, runner = runner_name.rsplit('.', 1)
    try:
      return getattr(importlib.import_module(module), runner)()
    except ImportError:
      if 'dataflow' in runner_name.lower():
        raise ImportError(
            'Google Cloud Dataflow runner not available, '
            'please install apache_beam[gcp]')
      elif 'interactive' in runner_name.lower():
        raise ImportError(
            'Interactive runner not available, '
            'please install apache_beam[interactive]')
      else:
        raise
  else:
    raise ValueError(
        'Unexpected pipeline runner: %s. Valid values are %s '
        'or the fully qualified name of a PipelineRunner subclass.' %
        (runner_name, ', '.join(StandardOptions.KNOWN_RUNNER_NAMES)))
[docs]class PipelineRunner(object):
  """A runner of a pipeline object.
  The base runner provides a run() method for visiting every node in the
  pipeline's DAG and executing the transforms computing the PValue in the node.
  A custom runner will typically provide implementations for some of the
  transform methods (ParDo, GroupByKey, Create, etc.). It may also
  provide a new implementation for clear_pvalue(), which is used to wipe out
  materialized values in order to reduce footprint.
  """
[docs]  def run(self,
          transform,  # type: PTransform
          options=None  # type: Optional[PipelineOptions]
         ):
    # type: (...) -> PipelineResult
    """Run the given transform or callable with this runner.
    Blocks until the pipeline is complete.  See also `PipelineRunner.run_async`.
    """
    result = self.run_async(transform, options)
    result.wait_until_finish()
    return result 
[docs]  def run_async(self,
                transform,  # type: PTransform
                options=None  # type: Optional[PipelineOptions]
               ):
    # type: (...) -> PipelineResult
    """Run the given transform or callable with this runner.
    May return immediately, executing the pipeline in the background.
    The returned result object can be queried for progress, and
    `wait_until_finish` may be called to block until completion.
    """
    # Imported here to avoid circular dependencies.
    # pylint: disable=wrong-import-order, wrong-import-position
    from apache_beam import PTransform
    from apache_beam.pvalue import PBegin
    from apache_beam.pipeline import Pipeline
    p = Pipeline(runner=self, options=options)
    if isinstance(transform, PTransform):
      p | transform
    else:
      transform(PBegin(p))
    return p.run() 
[docs]  def run_portable_pipeline(
      self, pipeline: beam_runner_api_pb2.Pipeline,
      options: PipelineOptions) -> 'PipelineResult':
    """Execute the entire pipeline.
    Runners should override this method.
    """
    raise NotImplementedError 
[docs]  def default_environment(
      self, options: PipelineOptions) -> environments.Environment:
    """Returns the default environment that should be used for this runner.
    Runners may override this method to provide alternative environments.
    """
    return environments.Environment.from_options(
        options.view_as(PortableOptions)) 
[docs]  def run_pipeline(
      self,
      pipeline,  # type: Pipeline
      options  # type: PipelineOptions
  ):
    # type: (...) -> PipelineResult
    """Execute the entire pipeline or the sub-DAG reachable from a node.
    """
    pipeline.visit(
        group_by_key_input_visitor(
            not options.view_as(TypeOptions).allow_non_deterministic_key_coders)
    )
    # TODO: https://github.com/apache/beam/issues/19168
    # portable runner specific default
    if options.view_as(SetupOptions).sdk_location == 'default':
      options.view_as(SetupOptions).sdk_location = 'container'
    return self.run_portable_pipeline(
        pipeline.to_runner_api(
            default_environment=self.default_environment(options)),
        options) 
[docs]  def apply(self,
            transform,  # type: PTransform
            input,  # type: Optional[pvalue.PValue]
            options  # type: PipelineOptions
           ):
    # TODO(robertwb): Remove indirection once internal references are fixed.
    return self.apply_PTransform(transform, input, options) 
[docs]  def is_fnapi_compatible(self):
    """Whether to enable the beam_fn_api experiment by default."""
    return True 
[docs]  def check_requirements(
      self,
      pipeline_proto: beam_runner_api_pb2.Pipeline,
      supported_requirements: Iterable[str]):
    """Check that this runner can satisfy all pipeline requirements."""
    # Imported here to avoid circular dependencies.
    # pylint: disable=wrong-import-order, wrong-import-position
    from apache_beam.runners.portability.fn_api_runner import translations
    supported_requirements = set(supported_requirements)
    for requirement in pipeline_proto.requirements:
      if requirement not in supported_requirements:
        raise ValueError(
            'Unable to run pipeline with requirement: %s' % requirement)
    for transform in pipeline_proto.components.transforms.values():
      if transform.spec.urn == common_urns.primitives.TEST_STREAM.urn:
        if common_urns.primitives.TEST_STREAM.urn not in supported_requirements:
          raise NotImplementedError(transform.spec.urn)
      elif transform.spec.urn in translations.PAR_DO_URNS:
        payload = beam_runner_api_pb2.ParDoPayload.FromString(
            transform.spec.payload)
        for timer in payload.timer_family_specs.values():
          if timer.time_domain not in (
              beam_runner_api_pb2.TimeDomain.EVENT_TIME,
              beam_runner_api_pb2.TimeDomain.PROCESSING_TIME):
            raise NotImplementedError(timer.time_domain)  
# FIXME: replace with PipelineState(str, enum.Enum)
[docs]class PipelineState(object):
  """State of the Pipeline, as returned by :attr:`PipelineResult.state`.
  This is meant to be the union of all the states any runner can put a
  pipeline in. Currently, it represents the values of the dataflow
  API JobState enum.
  """
  UNKNOWN = 'UNKNOWN'  # not specified by a runner, or unknown to a runner.
  STARTING = 'STARTING'  # not yet started
  STOPPED = 'STOPPED'  # paused or not yet started
  RUNNING = 'RUNNING'  # currently running
  DONE = 'DONE'  # successfully completed (terminal state)
  FAILED = 'FAILED'  # failed (terminal state)
  CANCELLED = 'CANCELLED'  # explicitly cancelled (terminal state)
  UPDATED = 'UPDATED'  # replaced by another job (terminal state)
  DRAINING = 'DRAINING'  # still processing, no longer reading data
  DRAINED = 'DRAINED'  # draining completed (terminal state)
  PENDING = 'PENDING'  # the job has been created but is not yet running.
  CANCELLING = 'CANCELLING'  # job has been explicitly cancelled and is
  # in the process of stopping
  RESOURCE_CLEANING_UP = 'RESOURCE_CLEANING_UP'  # job's resources are being
  # cleaned up
  UNRECOGNIZED = 'UNRECOGNIZED'  # the job state reported by a runner cannot be
  # interpreted by the SDK.
[docs]  @classmethod
  def is_terminal(cls, state):
    return state in [
        cls.DONE, cls.FAILED, cls.CANCELLED, cls.UPDATED, cls.DRAINED
    ]  
[docs]class PipelineResult(object):
  """A :class:`PipelineResult` provides access to info about a pipeline."""
  def __init__(self, state):
    self._state = state
  @property
  def state(self):
    """Return the current state of the pipeline execution."""
    return self._state
[docs]  def wait_until_finish(self, duration=None):  # pylint: disable=unused-argument
    """Waits until the pipeline finishes and returns the final status.
    Args:
      duration (int): The time to wait (in milliseconds) for job to finish.
        If it is set to :data:`None`, it will wait indefinitely until the job
        is finished.
    Raises:
      IOError: If there is a persistent problem getting job
        information.
      NotImplementedError: If the runner does not support this
        operation.
    Returns:
      The final state of the pipeline, or :data:`None` on timeout.
    """
    if not PipelineState.is_terminal(self._state):
      raise NotImplementedError() 
[docs]  def cancel(self):
    """Cancels the pipeline execution.
    Raises:
      IOError: If there is a persistent problem getting job
        information.
      NotImplementedError: If the runner does not support this
        operation.
    Returns:
      The final state of the pipeline.
    """
    raise NotImplementedError() 
[docs]  def metrics(self):
    """Returns :class:`~apache_beam.metrics.metric.MetricResults` object to
    query metrics from the runner.
    Raises:
      NotImplementedError: If the runner does not support this
        operation.
    """
    raise NotImplementedError() 
  # pylint: disable=unused-argument
[docs]  def aggregated_values(self, aggregator_or_name):
    """Return a dict of step names to values of the Aggregator."""
    _LOGGER.warning(
        '%s does not implement aggregated_values', self.__class__.__name__)
    return {}