Source code for apache_beam.runners.dataflow.internal.dependency

# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# See the License for the specific language governing permissions and
# limitations under the License.

"""Support for installing custom code and required dependencies.

Workflows, with the exception of very simple ones, are organized in multiple
modules and packages. Typically, these modules and packages have
dependencies on other standard libraries. Dataflow relies on the Python
setuptools package to handle these scenarios. For further details please read:

When a runner tries to run a pipeline it will check for a --requirements_file
and a --setup_file option.

If --setup_file is present then it is assumed that the folder containing the
file specified by the option has the typical layout required by setuptools and
it will run 'python sdist' to produce a source distribution. The
resulting tarball (a .tar or .tar.gz file) will be staged at the GCS staging
location specified as job option. When a worker starts it will check for the
presence of this file and will run 'easy_install tarball' to install the
package in the worker.

If --requirements_file is present then the file specified by the option will be
staged in the GCS staging location.  When a worker starts it will check for the
presence of this file and will run 'pip install -r requirements.txt'. A
requirements file can be easily generated by running 'pip freeze -r
requirements.txt'. The reason a Dataflow runner does not run this automatically
is because quite often only a small fraction of the dependencies present in a
requirements.txt file are actually needed for remote execution and therefore a
one-time manual trimming is desirable.

TODO(silviuc): Staged files should have a job specific prefix.
To prevent several jobs in the same project stomping on each other due to a
shared staging location.

TODO(silviuc): Should we allow several setup packages?
TODO(silviuc): We should allow customizing the exact command for setup build.

import functools
import glob
import logging
import os
import re
import shutil
import sys
import tempfile

from apache_beam import version as beam_version
from apache_beam.internal import pickler
from import FileSystems
from apache_beam.runners.dataflow.internal import names
from apache_beam.utils import processes
from apache_beam.options.pipeline_options import GoogleCloudOptions
from apache_beam.options.pipeline_options import SetupOptions

# Update this version to the next version whenever there is a change that will
# require changes to the execution environment.

# Standard file names used for staging files.
WORKFLOW_TARBALL_FILE = 'workflow.tar.gz'
REQUIREMENTS_FILE = 'requirements.txt'
EXTRA_PACKAGES_FILE = 'extra_packages.txt'

GOOGLE_PACKAGE_NAME = 'google-cloud-dataflow'
BEAM_PACKAGE_NAME = 'apache-beam'

def _dependency_file_copy(from_path, to_path):
  """Copies a local file to a GCS file or vice versa."""'file copy from %s to %s.', from_path, to_path)
  if from_path.startswith('gs://') or to_path.startswith('gs://'):
    from import gcsio
    if from_path.startswith('gs://') and to_path.startswith('gs://'):
      # Both files are GCS files so copy.
      gcsio.GcsIO().copy(from_path, to_path)
    elif to_path.startswith('gs://'):
      # Only target is a GCS file, read local file and upload.
      with open(from_path, 'rb') as f:
        with gcsio.GcsIO().open(to_path, mode='wb') as g:
          pfun = functools.partial(, gcsio.WRITE_CHUNK_SIZE)
          for chunk in iter(pfun, ''):
      # Source is a GCS file but target is local file.
      with gcsio.GcsIO().open(from_path, mode='rb') as g:
        with open(to_path, 'wb') as f:
          pfun = functools.partial(, gcsio.DEFAULT_READ_BUFFER_SIZE)
          for chunk in iter(pfun, ''):
    # Branch used only for unit tests and integration tests.
    # In such environments GCS support is not available.
    if not os.path.isdir(os.path.dirname(to_path)):'Created folder (since we have not done yet, and any errors '
                   'will follow): %s ', os.path.dirname(to_path))
    shutil.copyfile(from_path, to_path)

def _dependency_file_download(from_url, to_folder):
  """Downloads a file from a URL and returns path to the local file."""
  # TODO(silviuc): We should cache downloads so we do not do it for every job.
    # We check if the file is actually there because wget returns a file
    # even for a 404 response (file will contain the contents of the 404
    # response).
    response, content = __import__('httplib2').Http().request(from_url)
    if int(response['status']) >= 400:
      raise RuntimeError(
          'Beam SDK not found at %s (response: %s)' % (from_url, response))
    local_download_file = os.path.join(to_folder, 'beam-sdk.tar.gz')
    with open(local_download_file, 'w') as f:
  except Exception:'Failed to download Beam SDK from %s', from_url)
  return local_download_file

def _stage_extra_packages(extra_packages, staging_location, temp_dir,
  """Stages a list of local extra packages.

    extra_packages: Ordered list of local paths to extra packages to be staged.
    staging_location: Staging location for the packages.
    temp_dir: Temporary folder where the resource building can happen. Caller
      is responsible for cleaning up this folder after this function returns.
    file_copy: Callable for copying files. The default version will copy from
      a local file to a GCS location using the gsutil tool available in the
      Google Cloud SDK package.

    A list of file names (no paths) for the resources staged. All the files
    are assumed to be staged in staging_location.

    RuntimeError: If files specified are not found or do not have expected
      name patterns.
  resources = []
  staging_temp_dir = None
  local_packages = []
  for package in extra_packages:
    if not (os.path.basename(package).endswith('.tar') or
            os.path.basename(package).endswith('.tar.gz') or
      raise RuntimeError(
          'The --extra_package option expects a full path ending with '
          '".tar" or ".tar.gz" instead of %s' % package)
    if os.path.basename(package).endswith('.whl'):
          'The .whl package "%s" is provided in --extra_package. '
          'This functionality is not officially supported. Since wheel '
          'packages are binary distributions, this package must be '
          'binary-compatible with the worker environment (e.g. Python 2.7 '
          'running on an x64 Linux host).')

    if not os.path.isfile(package):
      if package.startswith('gs://'):
        if not staging_temp_dir:
          staging_temp_dir = tempfile.mkdtemp(dir=temp_dir)'Downloading extra package: %s locally before staging',
        _dependency_file_copy(package, staging_temp_dir)
        raise RuntimeError(
            'The file %s cannot be found. It was specified in the '
            '--extra_packages command line option.' % package)

  if staging_temp_dir:
        [FileSystems.join(staging_temp_dir, f) for f in os.listdir(

  for package in local_packages:
    basename = os.path.basename(package)
    staged_path = FileSystems.join(staging_location, basename)
    file_copy(package, staged_path)
  # Create a file containing the list of extra packages and stage it.
  # The file is important so that in the worker the packages are installed
  # exactly in the order specified. This approach will avoid extra PyPI
  # requests. For example if package A depends on package B and package A
  # is installed first then the installer will try to satisfy the
  # dependency on B by downloading the package from PyPI. If package B is
  # installed first this is avoided.
  with open(os.path.join(temp_dir, EXTRA_PACKAGES_FILE), 'wt') as f:
    for package in local_packages:
      f.write('%s\n' % os.path.basename(package))
  staged_path = FileSystems.join(staging_location, EXTRA_PACKAGES_FILE)
  # Note that the caller of this function is responsible for deleting the
  # temporary folder where all temp files are created, including this one.
  file_copy(os.path.join(temp_dir, EXTRA_PACKAGES_FILE), staged_path)

  return resources

def _get_python_executable():
  # Allow overriding the python executable to use for downloading and
  # installing dependencies, otherwise use the python executable for
  # the current process.
  python_bin = os.environ.get('BEAM_PYTHON') or sys.executable
  if not python_bin:
    raise ValueError('Could not find Python executable.')
  return python_bin

def _populate_requirements_cache(requirements_file, cache_dir):
  # The 'pip download' command will not download again if it finds the
  # tarball with the proper version already present.
  # It will get the packages downloaded in the order they are presented in
  # the requirements file and will not download package dependencies.
  cmd_args = [
      _get_python_executable(), '-m', 'pip', 'install', '--download', cache_dir,
      '-r', requirements_file,
      # Download from PyPI source distributions.
      '--no-binary', ':all:']'Executing command: %s', cmd_args)

[docs]def stage_job_resources( options, file_copy=_dependency_file_copy, build_setup_args=None, temp_dir=None, populate_requirements_cache=_populate_requirements_cache): """For internal use only; no backwards-compatibility guarantees. Creates (if needed) and stages job resources to options.staging_location. Args: options: Command line options. More specifically the function will expect staging_location, requirements_file, setup_file, and save_main_session options to be present. file_copy: Callable for copying files. The default version will copy from a local file to a GCS location using the gsutil tool available in the Google Cloud SDK package. build_setup_args: A list of command line arguments used to build a setup package. Used only if options.setup_file is not None. Used only for testing. temp_dir: Temporary folder where the resource building can happen. If None then a unique temp directory will be created. Used only for testing. populate_requirements_cache: Callable for populating the requirements cache. Used only for testing. Returns: A list of file names (no paths) for the resources staged. All the files are assumed to be staged in options.staging_location. Raises: RuntimeError: If files specified are not found or error encountered while trying to create the resources (e.g., build a setup package). """ temp_dir = temp_dir or tempfile.mkdtemp() resources = [] google_cloud_options = options.view_as(GoogleCloudOptions) setup_options = options.view_as(SetupOptions) # Make sure that all required options are specified. There are a few that have # defaults to support local running scenarios. if google_cloud_options.staging_location is None: raise RuntimeError( 'The --staging_location option must be specified.') if google_cloud_options.temp_location is None: raise RuntimeError( 'The --temp_location option must be specified.') # Stage a requirements file if present. if setup_options.requirements_file is not None: if not os.path.isfile(setup_options.requirements_file): raise RuntimeError('The file %s cannot be found. It was specified in the ' '--requirements_file command line option.' % setup_options.requirements_file) staged_path = FileSystems.join(google_cloud_options.staging_location, REQUIREMENTS_FILE) file_copy(setup_options.requirements_file, staged_path) resources.append(REQUIREMENTS_FILE) requirements_cache_path = ( os.path.join(tempfile.gettempdir(), 'dataflow-requirements-cache') if setup_options.requirements_cache is None else setup_options.requirements_cache) # Populate cache with packages from requirements and stage the files # in the cache. if not os.path.exists(requirements_cache_path): os.makedirs(requirements_cache_path) populate_requirements_cache( setup_options.requirements_file, requirements_cache_path) for pkg in glob.glob(os.path.join(requirements_cache_path, '*')): file_copy(pkg, FileSystems.join(google_cloud_options.staging_location, os.path.basename(pkg))) resources.append(os.path.basename(pkg)) # Handle a setup file if present. # We will build the setup package locally and then copy it to the staging # location because the staging location is a GCS path and the file cannot be # created directly there. if setup_options.setup_file is not None: if not os.path.isfile(setup_options.setup_file): raise RuntimeError('The file %s cannot be found. It was specified in the ' '--setup_file command line option.' % setup_options.setup_file) if os.path.basename(setup_options.setup_file) != '': raise RuntimeError( 'The --setup_file option expects the full path to a file named ' ' instead of %s' % setup_options.setup_file) tarball_file = _build_setup_package(setup_options.setup_file, temp_dir, build_setup_args) staged_path = FileSystems.join(google_cloud_options.staging_location, WORKFLOW_TARBALL_FILE) file_copy(tarball_file, staged_path) resources.append(WORKFLOW_TARBALL_FILE) # Handle extra local packages that should be staged. if setup_options.extra_packages is not None: resources.extend( _stage_extra_packages(setup_options.extra_packages, google_cloud_options.staging_location, temp_dir=temp_dir, file_copy=file_copy)) # Pickle the main session if requested. # We will create the pickled main session locally and then copy it to the # staging location because the staging location is a GCS path and the file # cannot be created directly there. if setup_options.save_main_session: pickled_session_file = os.path.join(temp_dir, names.PICKLED_MAIN_SESSION_FILE) pickler.dump_session(pickled_session_file) staged_path = FileSystems.join(google_cloud_options.staging_location, names.PICKLED_MAIN_SESSION_FILE) file_copy(pickled_session_file, staged_path) resources.append(names.PICKLED_MAIN_SESSION_FILE) if hasattr(setup_options, 'sdk_location'): if setup_options.sdk_location == 'default': stage_tarball_from_remote_location = True elif (setup_options.sdk_location.startswith('gs://') or setup_options.sdk_location.startswith('http://') or setup_options.sdk_location.startswith('https://')): stage_tarball_from_remote_location = True else: stage_tarball_from_remote_location = False staged_path = FileSystems.join(google_cloud_options.staging_location, names.DATAFLOW_SDK_TARBALL_FILE) if stage_tarball_from_remote_location: # If --sdk_location is not specified then the appropriate package # will be obtained from PyPI ( based on the # version of the currently running SDK. If the option is # present then no version matching is made and the exact URL or path # is expected. # # Unit tests running in the 'python test' context will # not have the sdk_location attribute present and therefore we # will not stage a tarball. if setup_options.sdk_location == 'default': sdk_remote_location = 'pypi' else: sdk_remote_location = setup_options.sdk_location _stage_beam_sdk_tarball(sdk_remote_location, staged_path, temp_dir) resources.append(names.DATAFLOW_SDK_TARBALL_FILE) else: # Check if we have a local Beam SDK tarball present. This branch is # used by tests running with the SDK built at head. if setup_options.sdk_location == 'default': module_path = os.path.abspath(__file__) sdk_path = os.path.join( os.path.dirname(module_path), '..', '..', '..', names.DATAFLOW_SDK_TARBALL_FILE) elif os.path.isdir(setup_options.sdk_location): sdk_path = os.path.join( setup_options.sdk_location, names.DATAFLOW_SDK_TARBALL_FILE) else: sdk_path = setup_options.sdk_location if os.path.isfile(sdk_path):'Copying Beam SDK "%s" to staging location.', sdk_path) file_copy(sdk_path, staged_path) resources.append(names.DATAFLOW_SDK_TARBALL_FILE) else: if setup_options.sdk_location == 'default': raise RuntimeError('Cannot find default Beam SDK tar file "%s"', sdk_path) elif not setup_options.sdk_location:'Beam SDK will not be staged since --sdk_location ' 'is empty.') else: raise RuntimeError( 'The file "%s" cannot be found. Its location was specified by ' 'the --sdk_location command-line option.' % sdk_path) # Delete all temp files created while staging job resources. shutil.rmtree(temp_dir) return resources
def _build_setup_package(setup_file, temp_dir, build_setup_args=None): saved_current_directory = os.getcwd() try: os.chdir(os.path.dirname(setup_file)) if build_setup_args is None: build_setup_args = [ _get_python_executable(), os.path.basename(setup_file), 'sdist', '--dist-dir', temp_dir]'Executing command: %s', build_setup_args) processes.check_call(build_setup_args) output_files = glob.glob(os.path.join(temp_dir, '*.tar.gz')) if not output_files: raise RuntimeError( 'File %s not found.' % os.path.join(temp_dir, '*.tar.gz')) return output_files[0] finally: os.chdir(saved_current_directory) def _stage_beam_sdk_tarball(sdk_remote_location, staged_path, temp_dir): """Stage a Beam SDK tarball with the appropriate version. Args: sdk_remote_location: A GCS path to a SDK tarball or a URL from the file can be downloaded. staged_path: GCS path where the found SDK tarball should be copied. temp_dir: path to temporary location where the file should be downloaded. Raises: RuntimeError: If wget on the URL specified returs errors or the file cannot be copied from/to GCS. """ if (sdk_remote_location.startswith('http://') or sdk_remote_location.startswith('https://')): 'Staging Beam SDK tarball from %s to %s', sdk_remote_location, staged_path) local_download_file = _dependency_file_download( sdk_remote_location, temp_dir) _dependency_file_copy(local_download_file, staged_path) elif sdk_remote_location.startswith('gs://'): # Stage the file to the GCS staging area. 'Staging Beam SDK tarball from %s to %s', sdk_remote_location, staged_path) _dependency_file_copy(sdk_remote_location, staged_path) elif sdk_remote_location == 'pypi':'Staging the SDK tarball from PyPI to %s', staged_path) _dependency_file_copy(_download_pypi_sdk_package(temp_dir), staged_path) else: raise RuntimeError( 'The --sdk_location option was used with an unsupported ' 'type of location: %s' % sdk_remote_location)
[docs]def get_required_container_version(): """For internal use only; no backwards-compatibility guarantees. Returns the Google Cloud Dataflow container version for remote execution. """ # TODO(silviuc): Handle apache-beam versions when we have official releases. import pkg_resources as pkg try: version = pkg.get_distribution(GOOGLE_PACKAGE_NAME).version # We drop any pre/post parts of the version and we keep only the X.Y.Z # format. For instance the 0.3.0rc2 SDK version translates into 0.3.0. container_version = '%s.%s.%s' % pkg.parse_version(version)._version.release # We do, however, keep the ".dev" suffix if it is present. if re.match(r'.*\.dev[0-9]*$', version): container_version += '.dev' return container_version except pkg.DistributionNotFound: # This case covers Apache Beam end-to-end testing scenarios. All these tests # will run with a special container version. return BEAM_CONTAINER_VERSION
[docs]def get_sdk_name_and_version(): """For internal use only; no backwards-compatibility guarantees. Returns name and version of SDK reported to Google Cloud Dataflow.""" # TODO(ccy): Make this check cleaner. container_version = get_required_container_version() if container_version == BEAM_CONTAINER_VERSION: return ('Apache Beam SDK for Python', beam_version.__version__) return ('Google Cloud Dataflow SDK for Python', container_version)
[docs]def get_sdk_package_name(): """For internal use only; no backwards-compatibility guarantees. Returns the PyPI package name to be staged to Google Cloud Dataflow.""" container_version = get_required_container_version() if container_version == BEAM_CONTAINER_VERSION: return BEAM_PACKAGE_NAME return GOOGLE_PACKAGE_NAME
def _download_pypi_sdk_package(temp_dir): """Downloads SDK package from PyPI and returns path to local path.""" package_name = get_sdk_package_name() import pkg_resources as pkg try: version = pkg.get_distribution(package_name).version except pkg.DistributionNotFound: raise RuntimeError('Please set --sdk_location command-line option ' 'or install a valid {} distribution.' .format(package_name)) # Get a source distribution for the SDK package from PyPI. cmd_args = [ _get_python_executable(), '-m', 'pip', 'install', '--download', temp_dir, '%s==%s' % (package_name, version), '--no-binary', ':all:', '--no-deps']'Executing command: %s', cmd_args) processes.check_call(cmd_args) zip_expected = os.path.join( temp_dir, '' % (package_name, version)) if os.path.exists(zip_expected): return zip_expected tgz_expected = os.path.join( temp_dir, '%s-%s.tar.gz' % (package_name, version)) if os.path.exists(tgz_expected): return tgz_expected raise RuntimeError( 'Failed to download a source distribution for the running SDK. Expected ' 'either %s or %s to be found in the download folder.' % ( zip_expected, tgz_expected))