Source code for apache_beam.options.value_provider

#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

"""A ValueProvider class to implement templates with both statically
and dynamically provided values.
"""

from functools import wraps

from apache_beam import error

__all__ = [
    'ValueProvider',
    'StaticValueProvider',
    'RuntimeValueProvider',
    'check_accessible',
    ]


[docs]class ValueProvider(object):
[docs] def is_accessible(self): raise NotImplementedError( 'ValueProvider.is_accessible implemented in derived classes' )
[docs] def get(self): raise NotImplementedError( 'ValueProvider.get implemented in derived classes' )
[docs]class StaticValueProvider(ValueProvider): def __init__(self, value_type, value): self.value_type = value_type self.value = value_type(value)
[docs] def is_accessible(self): return True
[docs] def get(self): return self.value
def __str__(self): return str(self.value)
[docs]class RuntimeValueProvider(ValueProvider): runtime_options = None def __init__(self, option_name, value_type, default_value): self.option_name = option_name self.default_value = default_value self.value_type = value_type
[docs] def is_accessible(self): return RuntimeValueProvider.runtime_options is not None
[docs] def get(self): if RuntimeValueProvider.runtime_options is None: raise error.RuntimeValueProviderError( '%s.get() not called from a runtime context' % self) candidate = RuntimeValueProvider.runtime_options.get(self.option_name) if candidate: value = self.value_type(candidate) else: value = self.default_value return value
@classmethod
[docs] def set_runtime_options(cls, pipeline_options): RuntimeValueProvider.runtime_options = pipeline_options
def __str__(self): return '%s(option: %s, type: %s, default_value: %s)' % ( self.__class__.__name__, self.option_name, self.value_type.__name__, repr(self.default_value) )
[docs]def check_accessible(value_provider_list): """Check accessibility of a list of ValueProvider objects.""" assert isinstance(value_provider_list, list) def _check_accessible(fnc): @wraps(fnc) def _f(self, *args, **kwargs): for obj in [getattr(self, vp) for vp in value_provider_list]: if not obj.is_accessible(): raise error.RuntimeValueProviderError('%s not accessible' % obj) return fnc(self, *args, **kwargs) return _f return _check_accessible