Source code for apache_beam.utils.transform_service_launcher

#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

import argparse
import logging
import os
import shutil
import subprocess
import sys
import tempfile
import threading
import time
import zipfile
from pathlib import Path

import grpc

from apache_beam.utils import subprocess_server

_LOGGER = logging.getLogger(__name__)

_COMMAND_POSSIBLE_VALUES = ['up', 'down', 'ps']

_EXPANSION_SERVICE_LAUNCHER_JAR = ':sdks:java:transform-service:app:build'


[docs]class TransformServiceLauncher(object): _DEFAULT_PROJECT_NAME = 'apache.beam.transform.service' _DEFAULT_START_WAIT_TIMEOUT = 50000 _launchers = {} # type: ignore # Maintaining a static list of launchers to prevent temporary resources # from being created unnecessarily. def __new__(cls, project_name, port, beam_version=None): if project_name not in TransformServiceLauncher._launchers: TransformServiceLauncher._launchers[project_name] = super( TransformServiceLauncher, cls).__new__(cls) return TransformServiceLauncher._launchers[project_name] def __init__(self, project_name, port, beam_version=None): logging.info('Initializing the Beam Transform Service %s.' % project_name) self._project_name = project_name self._port = port self._address = 'localhost:' + str(self._port) self._launcher_lock = threading.RLock() self.docker_compose_command_prefix = [ 'docker-compose', '-p', project_name, '-f', 'TODO path' ] # Setting up Docker Compose configuration. # We use Docker Compose project name as the name of the temporary directory # to isolate different transform service instances that may be running in # the same machine. temp_dir = os.path.join(tempfile.gettempdir(), project_name) if not os.path.exists(temp_dir): os.mkdir(temp_dir) # Get the jar with configs path_to_local_jar = subprocess_server.JavaJarServer.local_jar( subprocess_server.JavaJarServer.path_to_beam_jar( _EXPANSION_SERVICE_LAUNCHER_JAR)) with zipfile.ZipFile(path_to_local_jar) as launcher_jar: launcher_jar.extract('docker-compose.yml', path=temp_dir) launcher_jar.extract('.env', path=temp_dir) compose_file = os.path.join(temp_dir, 'docker-compose.yml') # Creating the credentials volume. credentials_dir = os.path.join(temp_dir, 'credentials_dir') if not os.path.exists(credentials_dir): os.mkdir(credentials_dir) logging.info('Copying the Google Application Default Credentials file.') is_windows = 'windows' in os.name.lower() application_default_path_suffix = ( '\\gcloud\\application_default_credentials.json' if is_windows else '.config/gcloud/application_default_credentials.json') application_default_path_file = os.path.join( str(Path.home()), application_default_path_suffix) application_default_path_copied = os.path.join( credentials_dir, 'application_default_credentials.json') if os.path.exists(application_default_path_file): shutil.copyfile( application_default_path_file, application_default_path_copied) else: logging.info( 'GCP credentials will not be available for the transform service ' 'since could not find the Google Cloud application default ' 'credentials file at the expected location %s.' % application_default_path_file) # Creating the dependencies volume. dependencies_dir = os.path.join(temp_dir, 'dependencies_dir') if not os.path.exists(dependencies_dir): os.mkdir(dependencies_dir) self._environmental_variables = {} self._environmental_variables['CREDENTIALS_VOLUME'] = credentials_dir self._environmental_variables['DEPENDENCIES_VOLUME'] = dependencies_dir self._environmental_variables['TRANSFORM_SERVICE_PORT'] = str(port) self._environmental_variables['BEAM_VERSION'] = beam_version # Setting an empty requirements file requirements_file_name = os.path.join(dependencies_dir, 'requirements.txt') with open(requirements_file_name, 'w') as _: pass self._environmental_variables['PYTHON_REQUIREMENTS_FILE_NAME'] = ( 'requirements.txt') self._docker_compose_start_command_prefix = [] self._docker_compose_start_command_prefix.append('docker-compose') self._docker_compose_start_command_prefix.append('-p') self._docker_compose_start_command_prefix.append(project_name) self._docker_compose_start_command_prefix.append('-f') self._docker_compose_start_command_prefix.append(compose_file) def _get_channel(self): channel_options = [("grpc.max_receive_message_length", -1), ("grpc.max_send_message_length", -1)] if hasattr(grpc, 'local_channel_credentials'): # TODO: update this to support secure non-local channels. return grpc.secure_channel( self._address, grpc.local_channel_credentials(), options=channel_options) else: return grpc.insecure_channel(self._address, options=channel_options) def __enter__(self): self.start() self.wait_till_up(-1) self._channel = self._get_channel() from apache_beam import external return external.ExpansionAndArtifactRetrievalStub(self._channel.__enter__()) def __exit__(self, *args): self.shutdown() self._channel.__exit__(*args) def _run_docker_compose_command(self, command, output_override=None): cmd = [] cmd.extend(self._docker_compose_start_command_prefix) cmd.extend(command) myenv = os.environ.copy() myenv.update(self._environmental_variables) process = subprocess.Popen( cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, env=myenv) std_out, _ = process.communicate() if output_override: output_override.write(std_out) else: print(std_out.decode(errors='backslashreplace'))
[docs] def start(self): with self._launcher_lock: self._run_docker_compose_command(['up', '-d'])
[docs] def shutdown(self): with self._launcher_lock: self._run_docker_compose_command(['down'])
[docs] def status(self): with self._launcher_lock: self._run_docker_compose_command(['ps'])
[docs] def wait_till_up(self, timeout_ms): channel = self._get_channel() timeout_ms = ( TransformServiceLauncher._DEFAULT_START_WAIT_TIMEOUT if timeout_ms <= 0 else timeout_ms) # Waiting till the service is up. channel_ready = grpc.channel_ready_future(channel) wait_secs = .1 start_time = time.time() while True: if (time.time() - start_time) * 1000 > timeout_ms > 0: raise ValueError( 'Transform service did not start in %s seconds.' % (timeout_ms / 1000)) try: channel_ready.result(timeout=wait_secs) break except (grpc.FutureTimeoutError, grpc.RpcError): wait_secs *= 1.2 logging.log( logging.WARNING if wait_secs > 1 else logging.DEBUG, 'Waiting for the transform service to be ready at %s.', self._address) logging.info('Transform service ' + self._project_name + ' started.')
def _get_status(self): tmp = tempfile.NamedTemporaryFile(delete=False) self._run_docker_compose_command(['ps'], tmp) tmp.close() return tmp.name
[docs]def main(argv): parser = argparse.ArgumentParser() parser.add_argument('--project_name', help='Docker Compose project name.') parser.add_argument( '--command', required=True, choices=_COMMAND_POSSIBLE_VALUES, help='Command to run. Possible values are ' + ', '.join(_COMMAND_POSSIBLE_VALUES)) parser.add_argument( '--port', type=int, default=-1, help='External visible port of the transform service.') parser.add_argument( '--beam_version', required=True, help='Beam version of the expansion service containers to be used.') known_args, _ = parser.parse_known_args(argv) project_name = ( TransformServiceLauncher._DEFAULT_PROJECT_NAME if known_args.project_name is None else known_args.project_name) logging.info( 'Starting the Beam Transform Service at %s.' % ( 'the default port' if known_args.port < 0 else (' port ' + str(known_args.port)))) launcher = TransformServiceLauncher( project_name, known_args.port, known_args.beam_version) if known_args.command == 'up': launcher.start() launcher.wait_till_up(-1) elif known_args.command == 'down': launcher.shutdown() elif known_args.command == 'ps': launcher.status() else: raise ValueError( 'Unknown command %s possible values are %s' % (known_args.command, ', '.join(_COMMAND_POSSIBLE_VALUES)))
if __name__ == '__main__': logging.getLogger().setLevel(logging.INFO) main(sys.argv)