#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
"""Environments concepts.
For internal use only. No backwards compatibility guarantees."""
# pytype: skip-file
import json
import logging
import sys
import tempfile
from types import MappingProxyType
from typing import TYPE_CHECKING
from typing import Any
from typing import Callable
from typing import Dict
from typing import Iterable
from typing import Iterator
from typing import List
from typing import Mapping
from typing import Optional
from typing import Set
from typing import Tuple
from typing import Type
from typing import TypeVar
from typing import Union
from typing import overload
from google.protobuf import message
from apache_beam import coders
from apache_beam.options.pipeline_options import PortableOptions
from apache_beam.options.pipeline_options import SetupOptions
from apache_beam.portability import common_urns
from apache_beam.portability import python_urns
from apache_beam.portability.api import beam_runner_api_pb2
from apache_beam.portability.api import endpoints_pb2
from apache_beam.runners.portability import stager
from apache_beam.transforms.resources import resource_hints_from_options
from apache_beam.utils import proto_utils
if TYPE_CHECKING:
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.runners.pipeline_context import PipelineContext
__all__ = [
'Environment',
'AnyOfEnvironment',
'DefaultEnvironment',
'DockerEnvironment',
'ProcessEnvironment',
'ExternalEnvironment',
'EmbeddedPythonEnvironment',
'EmbeddedPythonGrpcEnvironment',
'SubprocessSDKEnvironment',
'PyPIArtifactRegistry'
]
T = TypeVar('T')
EnvironmentT = TypeVar('EnvironmentT', bound='Environment')
ConstructorFn = Callable[[
Optional[Any],
Iterable[str],
Iterable[beam_runner_api_pb2.ArtifactInformation],
Mapping[str, bytes],
'PipelineContext'
],
Any]
def looks_like_json(s):
import re
return re.match(r'\s*\{.*\}\s*$', s)
APACHE_BEAM_DOCKER_IMAGE_PREFIX = 'apache/beam'
APACHE_BEAM_JAVA_CONTAINER_NAME_PREFIX = 'beam_java'
SDK_VERSION_CAPABILITY_PREFIX = 'beam:version:sdk_base:'
def is_apache_beam_container(container_image):
return container_image and container_image.startswith(
APACHE_BEAM_DOCKER_IMAGE_PREFIX)
[docs]
class Environment(object):
"""Abstract base class for environments.
Represents a type and configuration of environment.
Each type of Environment should have a unique urn.
For internal use only. No backwards compatibility guarantees.
"""
_known_urns = {} # type: Dict[str, Tuple[Optional[type], ConstructorFn]]
_urn_to_env_cls = {} # type: Dict[str, type]
def __init__(self,
capabilities=(), # type: Iterable[str]
artifacts=(), # type: Iterable[beam_runner_api_pb2.ArtifactInformation]
resource_hints=None, # type: Optional[Mapping[str, bytes]]
):
# type: (...) -> None
self._capabilities = capabilities
self._artifacts = sorted(artifacts, key=lambda x: x.SerializeToString())
# Hints on created environments should be immutable since pipeline context
# stores environments in hash maps and we use hints to compute the hash.
self._resource_hints = MappingProxyType(
dict(resource_hints) if resource_hints else {})
def __eq__(self, other):
return (
self.__class__ == other.__class__ and
self._artifacts == other._artifacts
# Assuming that we don't have instances of the same Environment subclass
# with different set of capabilities.
and self._resource_hints == other._resource_hints)
def __hash__(self):
# type: () -> int
return hash((self.__class__, frozenset(self._resource_hints.items())))
[docs]
def artifacts(self):
# type: () -> Iterable[beam_runner_api_pb2.ArtifactInformation]
return self._artifacts
[docs]
def to_runner_api_parameter(self, context):
# type: (PipelineContext) -> Tuple[str, Optional[Union[message.Message, bytes, str]]]
raise NotImplementedError
[docs]
def capabilities(self):
# type: () -> Iterable[str]
return self._capabilities
[docs]
def resource_hints(self):
# type: () -> Mapping[str, bytes]
return self._resource_hints
@classmethod
@overload
def register_urn(
cls,
urn, # type: str
parameter_type, # type: Type[T]
):
# type: (...) -> Callable[[Union[type, Callable[[T, Iterable[str], PipelineContext], Any]]], Callable[[T, Iterable[str], PipelineContext], Any]]
pass
@classmethod
@overload
def register_urn(
cls,
urn, # type: str
parameter_type, # type: None
):
# type: (...) -> Callable[[Union[type, Callable[[bytes, Iterable[str], Iterable[beam_runner_api_pb2.ArtifactInformation], PipelineContext], Any]]], Callable[[bytes, Iterable[str], PipelineContext], Any]]
pass
@classmethod
@overload
def register_urn(cls,
urn, # type: str
parameter_type, # type: Type[T]
constructor # type: Callable[[T, Iterable[str], Iterable[beam_runner_api_pb2.ArtifactInformation], PipelineContext], Any]
):
# type: (...) -> None
pass
@classmethod
@overload
def register_urn(cls,
urn, # type: str
parameter_type, # type: None
constructor # type: Callable[[bytes, Iterable[str], Iterable[beam_runner_api_pb2.ArtifactInformation], PipelineContext], Any]
):
# type: (...) -> None
pass
[docs]
@classmethod
def register_urn(cls, urn, parameter_type, constructor=None):
def register(constructor):
if isinstance(constructor, type):
constructor.from_runner_api_parameter = register(
constructor.from_runner_api_parameter)
# register environment urn to environment class
cls._urn_to_env_cls[urn] = constructor
return constructor
else:
cls._known_urns[urn] = parameter_type, constructor
return staticmethod(constructor)
if constructor:
# Used as a statement.
register(constructor)
else:
# Used as a decorator.
return register
[docs]
@classmethod
def get_env_cls_from_urn(cls, urn):
# type: (str) -> Type[Environment]
return cls._urn_to_env_cls[urn]
[docs]
def to_runner_api(self, context):
# type: (PipelineContext) -> beam_runner_api_pb2.Environment
urn, typed_param = self.to_runner_api_parameter(context)
return beam_runner_api_pb2.Environment(
urn=urn,
payload=typed_param.SerializeToString() if isinstance(
typed_param, message.Message) else typed_param if
(isinstance(typed_param, bytes) or
typed_param is None) else typed_param.encode('utf-8'),
capabilities=self.capabilities(),
dependencies=self.artifacts(),
resource_hints=self.resource_hints())
[docs]
@classmethod
def from_runner_api(cls,
proto, # type: Optional[beam_runner_api_pb2.Environment]
context # type: PipelineContext
):
# type: (...) -> Optional[Environment]
if proto is None or not proto.urn:
return None
parameter_type, constructor = cls._known_urns[proto.urn]
return constructor(
proto_utils.parse_Bytes(proto.payload, parameter_type),
proto.capabilities,
proto.dependencies,
proto.resource_hints,
context)
[docs]
@classmethod
def from_options(cls, options):
# type: (Type[EnvironmentT], PortableOptions) -> EnvironmentT
"""Creates an Environment object from PortableOptions.
Args:
options: The PortableOptions object.
"""
if cls != Environment:
raise NotImplementedError
portable_options = options.view_as(PortableOptions)
environment_type = portable_options.environment_type
if not environment_type:
environment_urn = common_urns.environments.DOCKER.urn
elif environment_type.startswith('beam:env:'):
environment_urn = environment_type
elif environment_type == 'LOOPBACK':
environment_urn = python_urns.EMBEDDED_PYTHON_LOOPBACK
else:
try:
environment_urn = getattr(
common_urns.environments, environment_type).urn
except AttributeError:
raise ValueError('Unknown environment type: %s' % environment_type)
env_class = Environment.get_env_cls_from_urn(environment_urn)
return env_class.from_options(portable_options) # type: ignore
[docs]
@Environment.register_urn(common_urns.environments.DEFAULT.urn, None)
class DefaultEnvironment(Environment):
"""Used as a stub when context is missing a default environment."""
[docs]
def to_runner_api_parameter(self, context):
return common_urns.environments.DEFAULT.urn, None
[docs]
@staticmethod
def from_runner_api_parameter(payload, # type: beam_runner_api_pb2.DockerPayload
capabilities, # type: Iterable[str]
artifacts, # type: Iterable[beam_runner_api_pb2.ArtifactInformation]
resource_hints, # type: Mapping[str, bytes]
context # type: PipelineContext
):
# type: (...) -> DefaultEnvironment
return DefaultEnvironment(
capabilities=capabilities,
artifacts=artifacts,
resource_hints=resource_hints)
[docs]
@Environment.register_urn(
common_urns.environments.DOCKER.urn, beam_runner_api_pb2.DockerPayload)
class DockerEnvironment(Environment):
def __init__(
self,
container_image=None, # type: Optional[str]
capabilities=(), # type: Iterable[str]
artifacts=(), # type: Iterable[beam_runner_api_pb2.ArtifactInformation]
resource_hints=None, # type: Optional[Mapping[str, bytes]]
):
super().__init__(capabilities, artifacts, resource_hints)
if container_image:
logging.debug(
'Using provided Python SDK container image: %s' % (container_image))
self.container_image = container_image
else:
logging.debug('No image given, using default Python SDK image')
self.container_image = self.default_docker_image()
logging.debug(
'Python SDK container image set to "%s" for Docker environment' %
(self.container_image))
def __eq__(self, other):
return (
super().__eq__(other) and self.container_image == other.container_image)
def __hash__(self):
return hash((super().__hash__(), self.container_image))
def __repr__(self):
return 'DockerEnvironment(container_image=%s)' % self.container_image
[docs]
def to_runner_api_parameter(self, context):
# type: (PipelineContext) -> Tuple[str, beam_runner_api_pb2.DockerPayload]
return (
common_urns.environments.DOCKER.urn,
beam_runner_api_pb2.DockerPayload(container_image=self.container_image))
[docs]
@staticmethod
def from_runner_api_parameter(payload, # type: beam_runner_api_pb2.DockerPayload
capabilities, # type: Iterable[str]
artifacts, # type: Iterable[beam_runner_api_pb2.ArtifactInformation]
resource_hints, # type: Mapping[str, bytes]
context # type: PipelineContext
):
# type: (...) -> DockerEnvironment
return DockerEnvironment(
container_image=payload.container_image,
capabilities=capabilities,
artifacts=artifacts,
resource_hints=resource_hints)
[docs]
@classmethod
def from_options(cls, options):
# type: (PortableOptions) -> DockerEnvironment
if options.view_as(SetupOptions).prebuild_sdk_container_engine:
# Imported here to avoid circular dependencies.
# pylint: disable=wrong-import-order, wrong-import-position
from apache_beam.runners.portability.sdk_container_builder import SdkContainerImageBuilder
prebuilt_container_image = SdkContainerImageBuilder.build_container_image(
options)
return cls.from_container_image(
container_image=prebuilt_container_image,
artifacts=python_sdk_dependencies(options),
resource_hints=resource_hints_from_options(options),
)
return cls.from_container_image(
container_image=options.lookup_environment_option(
'docker_container_image') or options.environment_config,
artifacts=python_sdk_dependencies(options),
resource_hints=resource_hints_from_options(options),
)
[docs]
@classmethod
def from_container_image(
cls, container_image, artifacts=(), resource_hints=None):
# type: (str, Iterable[beam_runner_api_pb2.ArtifactInformation], Optional[Mapping[str, bytes]]) -> DockerEnvironment
return cls(
container_image=container_image,
capabilities=python_sdk_docker_capabilities(),
artifacts=artifacts,
resource_hints=resource_hints)
[docs]
@staticmethod
def default_docker_image():
# type: () -> str
from apache_beam import version as beam_version
sdk_version = beam_version.__version__
version_suffix = '.'.join([str(i) for i in sys.version_info[0:2]])
image = (
APACHE_BEAM_DOCKER_IMAGE_PREFIX +
'_python{version_suffix}_sdk:{tag}'.format(
version_suffix=version_suffix, tag=sdk_version))
return image
[docs]
@Environment.register_urn(
common_urns.environments.PROCESS.urn, beam_runner_api_pb2.ProcessPayload)
class ProcessEnvironment(Environment):
def __init__(
self,
command, # type: str
os='', # type: str
arch='', # type: str
env=None, # type: Optional[Mapping[str, str]]
capabilities=(), # type: Iterable[str]
artifacts=(), # type: Iterable[beam_runner_api_pb2.ArtifactInformation]
resource_hints=None, # type: Optional[Mapping[str, bytes]]
):
# type: (...) -> None
super().__init__(capabilities, artifacts, resource_hints)
self.command = command
self.os = os
self.arch = arch
self.env = env or {}
def __eq__(self, other):
return (
super().__eq__(other) and self.command == other.command and
self.os == other.os and self.arch == other.arch and
self.env == other.env)
def __hash__(self):
# type: () -> int
return hash((
super().__hash__(),
self.command,
self.os,
self.arch,
frozenset(self.env.items())))
def __repr__(self):
# type: () -> str
repr_parts = ['command=%s' % self.command]
if self.os:
repr_parts.append('os=%s' % self.os)
if self.arch:
repr_parts.append('arch=%s' % self.arch)
repr_parts.append('env=%s' % self.env)
return 'ProcessEnvironment(%s)' % ','.join(repr_parts)
[docs]
def to_runner_api_parameter(self, context):
# type: (PipelineContext) -> Tuple[str, beam_runner_api_pb2.ProcessPayload]
return (
common_urns.environments.PROCESS.urn,
beam_runner_api_pb2.ProcessPayload(
os=self.os, arch=self.arch, command=self.command, env=self.env))
[docs]
@staticmethod
def from_runner_api_parameter(payload,
capabilities, # type: Iterable[str]
artifacts, # type: Iterable[beam_runner_api_pb2.ArtifactInformation]
resource_hints, # type: Mapping[str, bytes]
context # type: PipelineContext
):
# type: (...) -> ProcessEnvironment
return ProcessEnvironment(
command=payload.command,
os=payload.os,
arch=payload.arch,
env=payload.env,
capabilities=capabilities,
artifacts=artifacts,
resource_hints=resource_hints,
)
[docs]
@staticmethod
def parse_environment_variables(variables):
env = {}
for var in variables:
try:
name, value = var.split('=', 1)
env[name] = value
except ValueError:
raise ValueError(
'Invalid process_variables "%s" (expected assignment in the '
'form "FOO=bar").' % var)
return env
[docs]
@classmethod
def from_options(cls, options):
# type: (PortableOptions) -> ProcessEnvironment
if options.environment_config:
config = json.loads(options.environment_config)
return cls(
config.get('command'),
os=config.get('os', ''),
arch=config.get('arch', ''),
env=config.get('env', ''),
capabilities=python_sdk_capabilities(),
artifacts=python_sdk_dependencies(options),
resource_hints=resource_hints_from_options(options),
)
env = cls.parse_environment_variables(
options.lookup_environment_option('process_variables').split(',')
if options.lookup_environment_option('process_variables') else [])
return cls(
options.lookup_environment_option('process_command'),
env=env,
capabilities=python_sdk_capabilities(),
artifacts=python_sdk_dependencies(options),
resource_hints=resource_hints_from_options(options),
)
[docs]
@Environment.register_urn(
common_urns.environments.EXTERNAL.urn, beam_runner_api_pb2.ExternalPayload)
class ExternalEnvironment(Environment):
def __init__(
self,
url, # type: str
params=None, # type: Optional[Mapping[str, str]]
capabilities=(), # type: Iterable[str]
artifacts=(), # type: Iterable[beam_runner_api_pb2.ArtifactInformation]
resource_hints=None, # type: Optional[Mapping[str, bytes]]
):
super().__init__(capabilities, artifacts, resource_hints)
self.url = url
self.params = params
def __eq__(self, other):
return (
super().__eq__(other) and self.url == other.url and
self.params == other.params)
def __hash__(self):
# type: () -> int
return hash((
super().__hash__(),
self.url,
frozenset(self.params.items()) if self.params is not None else None))
def __repr__(self):
# type: () -> str
return 'ExternalEnvironment(url=%s,params=%s)' % (self.url, self.params)
[docs]
def to_runner_api_parameter(self, context):
# type: (PipelineContext) -> Tuple[str, beam_runner_api_pb2.ExternalPayload]
return (
common_urns.environments.EXTERNAL.urn,
beam_runner_api_pb2.ExternalPayload(
endpoint=endpoints_pb2.ApiServiceDescriptor(url=self.url),
params=self.params))
[docs]
@staticmethod
def from_runner_api_parameter(payload, # type: beam_runner_api_pb2.ExternalPayload
capabilities, # type: Iterable[str]
artifacts, # type: Iterable[beam_runner_api_pb2.ArtifactInformation]
resource_hints, # type: Mapping[str, bytes]
context # type: PipelineContext
):
# type: (...) -> ExternalEnvironment
return ExternalEnvironment(
payload.endpoint.url,
params=payload.params or None,
capabilities=capabilities,
artifacts=artifacts,
resource_hints=resource_hints)
[docs]
@classmethod
def from_options(cls, options):
# type: (PortableOptions) -> ExternalEnvironment
if looks_like_json(options.environment_config):
config = json.loads(options.environment_config)
url = config.get('url')
if not url:
raise ValueError('External environment endpoint must be set.')
params = config.get('params')
elif options.environment_config:
url = options.environment_config
params = None
else:
url = options.lookup_environment_option('external_service_address')
params = None
return cls(
url,
params=params,
capabilities=python_sdk_capabilities(),
artifacts=python_sdk_dependencies(options),
resource_hints=resource_hints_from_options(options))
def expand_anyof_environments(env_proto):
if env_proto.urn == common_urns.environments.ANYOF.urn:
for alt in beam_runner_api_pb2.AnyOfEnvironmentPayload.FromString(
env_proto.payload).environments:
yield from expand_anyof_environments(alt)
else:
yield env_proto
def resolve_anyof_environment(env_proto, *preferred_types):
all_environments = list(expand_anyof_environments(env_proto))
for preferred_type in preferred_types:
for env in all_environments:
if env.urn == preferred_type:
return env
return all_environments[0]
[docs]
@Environment.register_urn(python_urns.EMBEDDED_PYTHON, None)
class EmbeddedPythonEnvironment(Environment):
[docs]
def to_runner_api_parameter(self, context):
# type: (PipelineContext) -> Tuple[str, None]
return python_urns.EMBEDDED_PYTHON, None
[docs]
@staticmethod
def from_runner_api_parameter(unused_payload, # type: None
capabilities, # type: Iterable[str]
artifacts, # type: Iterable[beam_runner_api_pb2.ArtifactInformation]
resource_hints, # type: Mapping[str, bytes]
context # type: PipelineContext
):
# type: (...) -> EmbeddedPythonEnvironment
return EmbeddedPythonEnvironment(capabilities, artifacts, resource_hints)
[docs]
@classmethod
def from_options(cls, options):
# type: (PortableOptions) -> EmbeddedPythonEnvironment
return cls(
capabilities=python_sdk_capabilities(),
artifacts=python_sdk_dependencies(options),
resource_hints=resource_hints_from_options(options),
)
[docs]
@classmethod
def default(cls):
# type: () -> EmbeddedPythonEnvironment
return cls(capabilities=python_sdk_capabilities(), artifacts=())
[docs]
@Environment.register_urn(python_urns.EMBEDDED_PYTHON_GRPC, bytes)
class EmbeddedPythonGrpcEnvironment(Environment):
def __init__(
self,
state_cache_size=None,
data_buffer_time_limit_ms=None,
capabilities=(),
artifacts=(),
resource_hints=None,
):
super().__init__(capabilities, artifacts, resource_hints)
self.state_cache_size = state_cache_size
self.data_buffer_time_limit_ms = data_buffer_time_limit_ms
def __eq__(self, other):
return (
super().__eq__(other) and
self.state_cache_size == other.state_cache_size and
self.data_buffer_time_limit_ms == other.data_buffer_time_limit_ms)
def __hash__(self):
# type: () -> int
return hash((
super().__hash__(),
self.state_cache_size,
self.data_buffer_time_limit_ms))
def __repr__(self):
# type: () -> str
repr_parts = []
if not self.state_cache_size is None:
repr_parts.append('state_cache_size=%d' % self.state_cache_size)
if not self.data_buffer_time_limit_ms is None:
repr_parts.append(
'data_buffer_time_limit_ms=%d' % self.data_buffer_time_limit_ms)
return 'EmbeddedPythonGrpcEnvironment(%s)' % ','.join(repr_parts)
[docs]
def to_runner_api_parameter(self, context):
# type: (PipelineContext) -> Tuple[str, bytes]
params = {}
if self.state_cache_size is not None:
params['state_cache_size'] = self.state_cache_size
if self.data_buffer_time_limit_ms is not None:
params['data_buffer_time_limit_ms'] = self.data_buffer_time_limit_ms
payload = json.dumps(params).encode('utf-8')
return python_urns.EMBEDDED_PYTHON_GRPC, payload
[docs]
@staticmethod
def from_runner_api_parameter(payload, # type: bytes
capabilities, # type: Iterable[str]
artifacts, # type: Iterable[beam_runner_api_pb2.ArtifactInformation]
resource_hints, # type: Mapping[str, bytes]
context # type: PipelineContext
):
# type: (...) -> EmbeddedPythonGrpcEnvironment
if payload:
config = EmbeddedPythonGrpcEnvironment.parse_config(
payload.decode('utf-8'))
return EmbeddedPythonGrpcEnvironment(
state_cache_size=config.get('state_cache_size'),
data_buffer_time_limit_ms=config.get('data_buffer_time_limit_ms'),
capabilities=capabilities,
artifacts=artifacts,
resource_hints=resource_hints)
else:
return EmbeddedPythonGrpcEnvironment()
[docs]
@classmethod
def from_options(cls, options):
# type: (PortableOptions) -> EmbeddedPythonGrpcEnvironment
if options.environment_config:
config = EmbeddedPythonGrpcEnvironment.parse_config(
options.environment_config)
return cls(
state_cache_size=config.get('state_cache_size'),
data_buffer_time_limit_ms=config.get('data_buffer_time_limit_ms'),
capabilities=python_sdk_capabilities(),
artifacts=python_sdk_dependencies(options))
else:
return cls(
capabilities=python_sdk_capabilities(),
artifacts=python_sdk_dependencies(options),
resource_hints=resource_hints_from_options(options))
[docs]
@staticmethod
def parse_config(s):
# type: (str) -> Dict[str, Any]
if looks_like_json(s):
config_dict = json.loads(s)
if 'state_cache_size' in config_dict:
config_dict['state_cache_size'] = int(config_dict['state_cache_size'])
if 'data_buffer_time_limit_ms' in config_dict:
config_dict['data_buffer_time_limit_ms'] = \
int(config_dict['data_buffer_time_limit_ms'])
return config_dict
else:
return {'state_cache_size': int(s)}
[docs]
@classmethod
def default(cls):
# type: () -> EmbeddedPythonGrpcEnvironment
return cls(capabilities=python_sdk_capabilities(), artifacts=())
@Environment.register_urn(python_urns.EMBEDDED_PYTHON_LOOPBACK, None)
class PythonLoopbackEnvironment(EmbeddedPythonEnvironment):
"""Used as a stub when the loopback worker has not yet been started."""
def to_runner_api_parameter(self, context):
# type: (PipelineContext) -> Tuple[str, None]
return python_urns.EMBEDDED_PYTHON_LOOPBACK, None
@staticmethod
def from_runner_api_parameter(unused_payload, # type: None
capabilities, # type: Iterable[str]
artifacts, # type: Iterable[beam_runner_api_pb2.ArtifactInformation]
resource_hints, # type: Mapping[str, bytes]
context # type: PipelineContext
):
# type: (...) -> PythonLoopbackEnvironment
return PythonLoopbackEnvironment(
capabilities=capabilities,
artifacts=artifacts,
resource_hints=resource_hints)
[docs]
@Environment.register_urn(python_urns.SUBPROCESS_SDK, bytes)
class SubprocessSDKEnvironment(Environment):
def __init__(
self,
command_string, # type: str
capabilities=(), # type: Iterable[str]
artifacts=(), # type: Iterable[beam_runner_api_pb2.ArtifactInformation]
resource_hints=None, # type: Optional[Mapping[str, bytes]]
):
super().__init__(capabilities, artifacts, resource_hints)
self.command_string = command_string
def __eq__(self, other):
return (
super().__eq__(other) and self.command_string == other.command_string)
def __hash__(self):
# type: () -> int
return hash((super().__hash__(), self.command_string))
def __repr__(self):
# type: () -> str
return 'SubprocessSDKEnvironment(command_string=%s)' % self.command_string
[docs]
def to_runner_api_parameter(self, context):
# type: (PipelineContext) -> Tuple[str, bytes]
return python_urns.SUBPROCESS_SDK, self.command_string.encode('utf-8')
[docs]
@staticmethod
def from_runner_api_parameter(payload, # type: bytes
capabilities, # type: Iterable[str]
artifacts, # type: Iterable[beam_runner_api_pb2.ArtifactInformation]
resource_hints, # type: Mapping[str, bytes]
context # type: PipelineContext
):
# type: (...) -> SubprocessSDKEnvironment
return SubprocessSDKEnvironment(
payload.decode('utf-8'), capabilities, artifacts, resource_hints)
[docs]
@classmethod
def from_options(cls, options):
# type: (PortableOptions) -> SubprocessSDKEnvironment
return cls(
options.environment_config,
capabilities=python_sdk_capabilities(),
artifacts=python_sdk_dependencies(options),
resource_hints=resource_hints_from_options(options))
[docs]
@classmethod
def from_command_string(cls, command_string):
# type: (str) -> SubprocessSDKEnvironment
return cls(
command_string, capabilities=python_sdk_capabilities(), artifacts=())
[docs]
@Environment.register_urn(
common_urns.environments.ANYOF.urn,
beam_runner_api_pb2.AnyOfEnvironmentPayload)
class AnyOfEnvironment(Environment):
def __init__(self, environments):
self._environments = environments
[docs]
def to_runner_api_parameter(self, context):
# type: (PipelineContext) -> Tuple[str, beam_runner_api_pb2.AnyOfEnvironmentPayload]
return (
common_urns.environments.ANYOF.urn,
beam_runner_api_pb2.AnyOfEnvironmentPayload(
environments=[
env.to_runner_api(context) for env in self._environments
]))
[docs]
@staticmethod
def from_runner_api_parameter(payload, # type: beam_runner_api_pb2.AnyOfEnvironmentPayload
capabilities, # type: Iterable[str]
artifacts, # type: Iterable[beam_runner_api_pb2.ArtifactInformation]
resource_hints, # type: Mapping[str, bytes]
context # type: PipelineContext
):
# type: (...) -> AnyOfEnvironment
return AnyOfEnvironment([
Environment.from_runner_api(env, context)
for env in payload.environments
])
[docs]
@staticmethod
def create_proto(
environments: Iterable[beam_runner_api_pb2.Environment]
) -> beam_runner_api_pb2.Environment:
return beam_runner_api_pb2.Environment(
urn=common_urns.environments.ANYOF.urn,
payload=beam_runner_api_pb2.AnyOfEnvironmentPayload(
environments=environments).SerializeToString())
[docs]
class PyPIArtifactRegistry(object):
_registered_artifacts = set() # type: Set[Tuple[str, str]]
[docs]
@classmethod
def register_artifact(cls, name, version):
cls._registered_artifacts.add((name, version))
[docs]
@classmethod
def get_artifacts(cls):
for artifact in cls._registered_artifacts:
yield artifact
def python_sdk_capabilities():
# type: () -> List[str]
return list(_python_sdk_capabilities_iter())
def python_sdk_docker_capabilities():
return python_sdk_capabilities() + [common_urns.protocols.SIBLING_WORKERS.urn]
def sdk_base_version_capability():
return (
SDK_VERSION_CAPABILITY_PREFIX + DockerEnvironment.default_docker_image())
def _python_sdk_capabilities_iter():
# type: () -> Iterator[str]
for urn_spec in common_urns.coders.__dict__.values():
if getattr(urn_spec, 'urn', None) in coders.Coder._known_urns:
yield urn_spec.urn
yield common_urns.protocols.LEGACY_PROGRESS_REPORTING.urn
yield common_urns.protocols.HARNESS_MONITORING_INFOS.urn
yield common_urns.protocols.WORKER_STATUS.urn
yield python_urns.PACKED_COMBINE_FN
yield sdk_base_version_capability()
yield common_urns.sdf_components.TRUNCATE_SIZED_RESTRICTION.urn
yield common_urns.primitives.TO_STRING.urn
yield common_urns.protocols.DATA_SAMPLING.urn
yield common_urns.protocols.SDK_CONSUMING_RECEIVED_DATA.urn
yield common_urns.protocols.ORDERED_LIST_STATE.urn
def python_sdk_dependencies(options, tmp_dir=None):
if tmp_dir is None:
tmp_dir = tempfile.mkdtemp()
skip_prestaged_dependencies = options.view_as(
SetupOptions).prebuild_sdk_container_engine is not None
return stager.Stager.create_job_resources(
options,
tmp_dir,
pypi_requirements=[
artifact[0] + artifact[1]
for artifact in PyPIArtifactRegistry.get_artifacts()
],
skip_prestaged_dependencies=skip_prestaged_dependencies)