#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
"""Environments concepts.
For internal use only. No backwards compatibility guarantees."""
from __future__ import absolute_import
import json
import logging
import sys
from google.protobuf import message
from apache_beam.portability import common_urns
from apache_beam.portability import python_urns
from apache_beam.portability.api import beam_runner_api_pb2
from apache_beam.portability.api import endpoints_pb2
from apache_beam.utils import proto_utils
__all__ = ['Environment',
'DockerEnvironment', 'ProcessEnvironment', 'ExternalEnvironment',
'EmbeddedPythonEnvironment', 'EmbeddedPythonGrpcEnvironment',
'SubprocessSDKEnvironment', 'RunnerAPIEnvironmentHolder']
[docs]class Environment(object):
"""Abstract base class for environments.
Represents a type and configuration of environment.
Each type of Environment should have a unique urn.
For internal use only. No backwards compatibility guarantees.
"""
_known_urns = {}
_urn_to_env_cls = {}
[docs] def to_runner_api_parameter(self, context):
raise NotImplementedError
[docs] @classmethod
def register_urn(cls, urn, parameter_type, constructor=None):
def register(constructor):
if isinstance(constructor, type):
constructor.from_runner_api_parameter = register(
constructor.from_runner_api_parameter)
# register environment urn to environment class
cls._urn_to_env_cls[urn] = constructor
return constructor
else:
cls._known_urns[urn] = parameter_type, constructor
return staticmethod(constructor)
if constructor:
# Used as a statement.
register(constructor)
else:
# Used as a decorator.
return register
[docs] @classmethod
def get_env_cls_from_urn(cls, urn):
return cls._urn_to_env_cls[urn]
[docs] def to_runner_api(self, context):
urn, typed_param = self.to_runner_api_parameter(context)
return beam_runner_api_pb2.Environment(
urn=urn,
payload=typed_param.SerializeToString()
if isinstance(typed_param, message.Message)
else typed_param if (isinstance(typed_param, bytes) or
typed_param is None)
else typed_param.encode('utf-8')
)
[docs] @classmethod
def from_runner_api(cls, proto, context):
if proto is None or not proto.urn:
return None
parameter_type, constructor = cls._known_urns[proto.urn]
try:
return constructor(
proto_utils.parse_Bytes(proto.payload, parameter_type),
context)
except Exception:
if context.allow_proto_holders:
return RunnerAPIEnvironmentHolder(proto)
raise
[docs] @classmethod
def from_options(cls, options):
"""Creates an Environment object from PipelineOptions.
Args:
options: The PipelineOptions object.
"""
raise NotImplementedError
[docs]@Environment.register_urn(common_urns.environments.DOCKER.urn,
beam_runner_api_pb2.DockerPayload)
class DockerEnvironment(Environment):
def __init__(self, container_image=None):
if container_image:
self.container_image = container_image
else:
self.container_image = self.default_docker_image()
def __eq__(self, other):
return self.__class__ == other.__class__ \
and self.container_image == other.container_image
def __ne__(self, other):
# TODO(BEAM-5949): Needed for Python 2 compatibility.
return not self == other
def __hash__(self):
return hash((self.__class__, self.container_image))
def __repr__(self):
return 'DockerEnvironment(container_image=%s)' % self.container_image
[docs] def to_runner_api_parameter(self, context):
return (common_urns.environments.DOCKER.urn,
beam_runner_api_pb2.DockerPayload(
container_image=self.container_image))
[docs] @staticmethod
def from_runner_api_parameter(payload, context):
return DockerEnvironment(container_image=payload.container_image)
[docs] @classmethod
def from_options(cls, options):
return cls(container_image=options.environment_config)
[docs] @staticmethod
def default_docker_image():
from apache_beam import version as beam_version
sdk_version = beam_version.__version__
version_suffix = '.'.join([str(i) for i in sys.version_info[0:2]])
logging.warning('Make sure that locally built Python SDK docker image '
'has Python %d.%d interpreter.' % (
sys.version_info[0], sys.version_info[1]))
image = ('apachebeam/python{version_suffix}_sdk:{tag}'.format(
version_suffix=version_suffix, tag=sdk_version))
logging.info(
'Using Python SDK docker image: %s. If the image is not '
'available at local, we will try to pull from hub.docker.com'
% (image))
return image
[docs]@Environment.register_urn(common_urns.environments.PROCESS.urn,
beam_runner_api_pb2.ProcessPayload)
class ProcessEnvironment(Environment):
def __init__(self, command, os='', arch='', env=None):
self.command = command
self.os = os
self.arch = arch
self.env = env or {}
def __eq__(self, other):
return self.__class__ == other.__class__ \
and self.command == other.command and self.os == other.os \
and self.arch == other.arch and self.env == other.env
def __ne__(self, other):
# TODO(BEAM-5949): Needed for Python 2 compatibility.
return not self == other
def __hash__(self):
return hash((self.__class__, self.command, self.os, self.arch,
frozenset(self.env.items())))
def __repr__(self):
repr_parts = ['command=%s' % self.command]
if self.os:
repr_parts.append('os=%s'% self.os)
if self.arch:
repr_parts.append('arch=%s' % self.arch)
repr_parts.append('env=%s' % self.env)
return 'ProcessEnvironment(%s)' % ','.join(repr_parts)
[docs] def to_runner_api_parameter(self, context):
return (common_urns.environments.PROCESS.urn,
beam_runner_api_pb2.ProcessPayload(
os=self.os,
arch=self.arch,
command=self.command,
env=self.env))
[docs] @staticmethod
def from_runner_api_parameter(payload, context):
return ProcessEnvironment(command=payload.command, os=payload.os,
arch=payload.arch, env=payload.env)
[docs] @classmethod
def from_options(cls, options):
config = json.loads(options.environment_config)
return cls(config.get('command'), os=config.get('os', ''),
arch=config.get('arch', ''), env=config.get('env', ''))
[docs]@Environment.register_urn(common_urns.environments.EXTERNAL.urn,
beam_runner_api_pb2.ExternalPayload)
class ExternalEnvironment(Environment):
def __init__(self, url, params=None):
self.url = url
self.params = params
def __eq__(self, other):
return self.__class__ == other.__class__ and self.url == other.url \
and self.params == other.params
def __ne__(self, other):
# TODO(BEAM-5949): Needed for Python 2 compatibility.
return not self == other
def __hash__(self):
params = self.params
if params is not None:
params = frozenset(self.params.items())
return hash((self.__class__, self.url, params))
def __repr__(self):
return 'ExternalEnvironment(url=%s,params=%s)' % (self.url, self.params)
[docs] def to_runner_api_parameter(self, context):
return (common_urns.environments.EXTERNAL.urn,
beam_runner_api_pb2.ExternalPayload(
endpoint=endpoints_pb2.ApiServiceDescriptor(url=self.url),
params=self.params
))
[docs] @staticmethod
def from_runner_api_parameter(payload, context):
return ExternalEnvironment(payload.endpoint.url,
params=payload.params or None)
[docs] @classmethod
def from_options(cls, options):
def looks_like_json(environment_config):
import re
return re.match(r'\s*\{.*\}\s*$', environment_config)
if looks_like_json(options.environment_config):
config = json.loads(options.environment_config)
url = config.get('url')
if not url:
raise ValueError('External environment endpoint must be set.')
params = config.get('params')
else:
url = options.environment_config
params = None
return cls(url, params=params)
[docs]@Environment.register_urn(python_urns.EMBEDDED_PYTHON, None)
class EmbeddedPythonEnvironment(Environment):
def __eq__(self, other):
return self.__class__ == other.__class__
def __ne__(self, other):
# TODO(BEAM-5949): Needed for Python 2 compatibility.
return not self == other
def __hash__(self):
return hash(self.__class__)
[docs] def to_runner_api_parameter(self, context):
return python_urns.EMBEDDED_PYTHON, None
[docs] @staticmethod
def from_runner_api_parameter(unused_payload, context):
return EmbeddedPythonEnvironment()
[docs] @classmethod
def from_options(cls, options):
return cls()
[docs]@Environment.register_urn(python_urns.EMBEDDED_PYTHON_GRPC, bytes)
class EmbeddedPythonGrpcEnvironment(Environment):
def __init__(self, state_cache_size=None):
self.state_cache_size = state_cache_size
def __eq__(self, other):
return self.__class__ == other.__class__ \
and self.state_cache_size == other.state_cache_size
def __ne__(self, other):
# TODO(BEAM-5949): Needed for Python 2 compatibility.
return not self == other
def __hash__(self):
return hash((self.__class__, self.state_cache_size))
def __repr__(self):
repr_parts = []
if not self.state_cache_size is None:
repr_parts.append('state_cache_size=%d' % self.state_cache_size)
return 'EmbeddedPythonGrpcEnvironment(%s)' % ','.join(repr_parts)
[docs] def to_runner_api_parameter(self, context):
if self.state_cache_size is None:
payload = b''
else:
payload = b'%d' % self.state_cache_size
return python_urns.EMBEDDED_PYTHON_GRPC, payload
[docs] @staticmethod
def from_runner_api_parameter(payload, context):
if payload:
state_cache_size = payload.decode('utf-8')
return EmbeddedPythonGrpcEnvironment(
state_cache_size=int(state_cache_size))
else:
return EmbeddedPythonGrpcEnvironment()
[docs] @classmethod
def from_options(cls, options):
if options.environment_config:
state_cache_size = options.environment_config
return cls(state_cache_size=state_cache_size)
else:
return cls()
[docs]@Environment.register_urn(python_urns.SUBPROCESS_SDK, bytes)
class SubprocessSDKEnvironment(Environment):
def __init__(self, command_string):
self.command_string = command_string
def __eq__(self, other):
return self.__class__ == other.__class__ \
and self.command_string == other.command_string
def __ne__(self, other):
# TODO(BEAM-5949): Needed for Python 2 compatibility.
return not self == other
def __hash__(self):
return hash((self.__class__, self.command_string))
def __repr__(self):
return 'SubprocessSDKEnvironment(command_string=%s)' % self.container_string
[docs] def to_runner_api_parameter(self, context):
return python_urns.SUBPROCESS_SDK, self.command_string.encode('utf-8')
[docs] @staticmethod
def from_runner_api_parameter(payload, context):
return SubprocessSDKEnvironment(payload.decode('utf-8'))
[docs] @classmethod
def from_options(cls, options):
return cls(options.environment_config)
[docs]class RunnerAPIEnvironmentHolder(Environment):
def __init__(self, proto):
self.proto = proto
[docs] def to_runner_api(self, context):
return self.proto
def __eq__(self, other):
return self.__class__ == other.__class__ and self.proto == other.proto
def __ne__(self, other):
# TODO(BEAM-5949): Needed for Python 2 compatibility.
return not self == other
def __hash__(self):
return hash((self.__class__, self.proto))