#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
"""A ValueProvider class to implement templates with both statically
and dynamically provided values.
"""
from functools import wraps
from apache_beam import error
__all__ = [
    'ValueProvider',
    'StaticValueProvider',
    'RuntimeValueProvider',
    'check_accessible',
    ]
[docs]class ValueProvider(object):
[docs]  def is_accessible(self):
    raise NotImplementedError(
        'ValueProvider.is_accessible implemented in derived classes'
    ) 
[docs]  def get(self):
    raise NotImplementedError(
        'ValueProvider.get implemented in derived classes'
    )  
[docs]class StaticValueProvider(ValueProvider):
  def __init__(self, value_type, value):
    self.value_type = value_type
    self.value = value_type(value)
[docs]  def is_accessible(self):
    return True 
[docs]  def get(self):
    return self.value 
  def __str__(self):
    return str(self.value) 
[docs]class RuntimeValueProvider(ValueProvider):
  runtime_options = None
  def __init__(self, option_name, value_type, default_value):
    self.option_name = option_name
    self.default_value = default_value
    self.value_type = value_type
[docs]  def is_accessible(self):
    return RuntimeValueProvider.runtime_options is not None 
[docs]  def get(self):
    if RuntimeValueProvider.runtime_options is None:
      raise error.RuntimeValueProviderError(
          '%s.get() not called from a runtime context' % self)
    candidate = RuntimeValueProvider.runtime_options.get(self.option_name)
    if candidate:
      value = self.value_type(candidate)
    else:
      value = self.default_value
    return value 
[docs]  @classmethod
  def set_runtime_options(cls, pipeline_options):
    RuntimeValueProvider.runtime_options = pipeline_options 
  def __str__(self):
    return '%s(option: %s, type: %s, default_value: %s)' % (
        self.__class__.__name__,
        self.option_name,
        self.value_type.__name__,
        repr(self.default_value)
    ) 
[docs]def check_accessible(value_provider_list):
  """Check accessibility of a list of ValueProvider objects."""
  assert isinstance(value_provider_list, list)
  def _check_accessible(fnc):
    @wraps(fnc)
    def _f(self, *args, **kwargs):
      for obj in [getattr(self, vp) for vp in value_provider_list]:
        if not obj.is_accessible():
          raise error.RuntimeValueProviderError('%s not accessible' % obj)
      return fnc(self, *args, **kwargs)
    return _f
  return _check_accessible