Source code for apache_beam.utils.transform_service_launcher
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import argparse
import logging
import os
import shutil
import subprocess
import sys
import tempfile
import threading
import time
import zipfile
from pathlib import Path
import grpc
from apache_beam.utils import subprocess_server
_LOGGER = logging.getLogger(__name__)
_COMMAND_POSSIBLE_VALUES = ['up', 'down', 'ps']
_EXPANSION_SERVICE_LAUNCHER_JAR = ':sdks:java:transform-service:app:build'
[docs]class TransformServiceLauncher(object):
_DEFAULT_PROJECT_NAME = 'apache.beam.transform.service'
_DEFAULT_START_WAIT_TIMEOUT = 50000
_launchers = {} # type: ignore
# Maintaining a static list of launchers to prevent temporary resources
# from being created unnecessarily.
def __new__(cls, project_name, port, beam_version=None):
if project_name not in TransformServiceLauncher._launchers:
TransformServiceLauncher._launchers[project_name] = super(
TransformServiceLauncher, cls).__new__(cls)
return TransformServiceLauncher._launchers[project_name]
def __init__(self, project_name, port, beam_version=None):
logging.info('Initializing the Beam Transform Service %s.' % project_name)
self._project_name = project_name
self._port = port
self._address = 'localhost:' + str(self._port)
self._launcher_lock = threading.RLock()
self.docker_compose_command_prefix = [
'docker-compose', '-p', project_name, '-f', 'TODO path'
]
# Setting up Docker Compose configuration.
# We use Docker Compose project name as the name of the temporary directory
# to isolate different transform service instances that may be running in
# the same machine.
temp_dir = os.path.join(tempfile.gettempdir(), project_name)
if not os.path.exists(temp_dir):
os.mkdir(temp_dir)
# Get the jar with configs
path_to_local_jar = subprocess_server.JavaJarServer.local_jar(
subprocess_server.JavaJarServer.path_to_beam_jar(
_EXPANSION_SERVICE_LAUNCHER_JAR))
with zipfile.ZipFile(path_to_local_jar) as launcher_jar:
launcher_jar.extract('docker-compose.yml', path=temp_dir)
launcher_jar.extract('.env', path=temp_dir)
compose_file = os.path.join(temp_dir, 'docker-compose.yml')
# Creating the credentials volume.
credentials_dir = os.path.join(temp_dir, 'credentials_dir')
if not os.path.exists(credentials_dir):
os.mkdir(credentials_dir)
logging.info('Copying the Google Application Default Credentials file.')
is_windows = 'windows' in os.name.lower()
application_default_path_suffix = (
'\\gcloud\\application_default_credentials.json' if is_windows else
'.config/gcloud/application_default_credentials.json')
application_default_path_file = os.path.join(
str(Path.home()), application_default_path_suffix)
application_default_path_copied = os.path.join(
credentials_dir, 'application_default_credentials.json')
if os.path.exists(application_default_path_file):
shutil.copyfile(
application_default_path_file, application_default_path_copied)
else:
logging.info(
'GCP credentials will not be available for the transform service '
'since could not find the Google Cloud application default '
'credentials file at the expected location %s.' %
application_default_path_file)
# Creating the dependencies volume.
dependencies_dir = os.path.join(temp_dir, 'dependencies_dir')
if not os.path.exists(dependencies_dir):
os.mkdir(dependencies_dir)
self._environmental_variables = {}
self._environmental_variables['CREDENTIALS_VOLUME'] = credentials_dir
self._environmental_variables['DEPENDENCIES_VOLUME'] = dependencies_dir
self._environmental_variables['TRANSFORM_SERVICE_PORT'] = str(port)
self._environmental_variables['BEAM_VERSION'] = beam_version
# Setting an empty requirements file
requirements_file_name = os.path.join(dependencies_dir, 'requirements.txt')
with open(requirements_file_name, 'w') as _:
pass
self._environmental_variables['PYTHON_REQUIREMENTS_FILE_NAME'] = (
'requirements.txt')
self._docker_compose_start_command_prefix = []
self._docker_compose_start_command_prefix.append('docker-compose')
self._docker_compose_start_command_prefix.append('-p')
self._docker_compose_start_command_prefix.append(project_name)
self._docker_compose_start_command_prefix.append('-f')
self._docker_compose_start_command_prefix.append(compose_file)
def _get_channel(self):
channel_options = [("grpc.max_receive_message_length", -1),
("grpc.max_send_message_length", -1)]
if hasattr(grpc, 'local_channel_credentials'):
# TODO: update this to support secure non-local channels.
return grpc.secure_channel(
self._address,
grpc.local_channel_credentials(),
options=channel_options)
else:
return grpc.insecure_channel(self._address, options=channel_options)
def __enter__(self):
self.start()
self.wait_till_up(-1)
self._channel = self._get_channel()
from apache_beam import external
return external.ExpansionAndArtifactRetrievalStub(self._channel.__enter__())
def __exit__(self, *args):
self.shutdown()
self._channel.__exit__(*args)
def _run_docker_compose_command(self, command, output_override=None):
cmd = []
cmd.extend(self._docker_compose_start_command_prefix)
cmd.extend(command)
myenv = os.environ.copy()
myenv.update(self._environmental_variables)
process = subprocess.Popen(
cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, env=myenv)
std_out, _ = process.communicate()
if output_override:
output_override.write(std_out)
else:
print(std_out.decode(errors='backslashreplace'))
[docs] def wait_till_up(self, timeout_ms):
channel = self._get_channel()
timeout_ms = (
TransformServiceLauncher._DEFAULT_START_WAIT_TIMEOUT
if timeout_ms <= 0 else timeout_ms)
# Waiting till the service is up.
channel_ready = grpc.channel_ready_future(channel)
wait_secs = .1
start_time = time.time()
while True:
if (time.time() - start_time) * 1000 > timeout_ms > 0:
raise ValueError(
'Transform service did not start in %s seconds.' %
(timeout_ms / 1000))
try:
channel_ready.result(timeout=wait_secs)
break
except (grpc.FutureTimeoutError, grpc.RpcError):
wait_secs *= 1.2
logging.log(
logging.WARNING if wait_secs > 1 else logging.DEBUG,
'Waiting for the transform service to be ready at %s.',
self._address)
logging.info('Transform service ' + self._project_name + ' started.')
def _get_status(self):
tmp = tempfile.NamedTemporaryFile(delete=False)
self._run_docker_compose_command(['ps'], tmp)
tmp.close()
return tmp.name
[docs]def main(argv):
parser = argparse.ArgumentParser()
parser.add_argument('--project_name', help='Docker Compose project name.')
parser.add_argument(
'--command',
required=True,
choices=_COMMAND_POSSIBLE_VALUES,
help='Command to run. Possible values are ' +
', '.join(_COMMAND_POSSIBLE_VALUES))
parser.add_argument(
'--port',
type=int,
default=-1,
help='External visible port of the transform service.')
parser.add_argument(
'--beam_version',
required=True,
help='Beam version of the expansion service containers to be used.')
known_args, _ = parser.parse_known_args(argv)
project_name = (
TransformServiceLauncher._DEFAULT_PROJECT_NAME
if known_args.project_name is None else known_args.project_name)
logging.info(
'Starting the Beam Transform Service at %s.' % (
'the default port' if known_args.port < 0 else
(' port ' + str(known_args.port))))
launcher = TransformServiceLauncher(
project_name, known_args.port, known_args.beam_version)
if known_args.command == 'up':
launcher.start()
launcher.wait_till_up(-1)
elif known_args.command == 'down':
launcher.shutdown()
elif known_args.command == 'ps':
launcher.status()
else:
raise ValueError(
'Unknown command %s possible values are %s' %
(known_args.command, ', '.join(_COMMAND_POSSIBLE_VALUES)))
if __name__ == '__main__':
logging.getLogger().setLevel(logging.INFO)
main(sys.argv)