"""A runner implementation that submits a job for remote execution.

import logging
import random
import string

import grpc

from apache_beam.portability.api import beam_job_api_pb2
from apache_beam.runners.job import utils as job_utils
from apache_beam.runners.job.manager import DockerRPCManager
from apache_beam.runners.runner import PipelineResult
from apache_beam.runners.runner import PipelineRunner

__all__ = ['PythonRPCDirectRunner']

[docs]class PythonRPCDirectRunner(PipelineRunner): """Executes a single pipeline on the local machine inside a container.""" # A list of PTransformOverride objects to be applied before running a pipeline # using DirectRunner. # Currently this only works for overrides where the input and output types do # not change. # For internal SDK use only. This should not be updated by Beam pipeline # authors. _PTRANSFORM_OVERRIDES = [] def __init__(self): self._cache = None
[docs] def run(self, pipeline): """Remotely executes entire pipeline or parts reachable from node.""" # Performing configured PTransform overrides. pipeline.replace_all(PythonRPCDirectRunner._PTRANSFORM_OVERRIDES) # Start the RPC co-process manager = DockerRPCManager() # Submit the job to the RPC co-process jobName = ('Job-' + ''.join(random.choice(string.ascii_uppercase) for _ in range(6))) options = {k: v for k, v in pipeline._options.get_all_options().iteritems() if v is not None} try: response = pipeline=pipeline.to_runner_api(), pipelineOptions=job_utils.dict_to_struct(options), jobName=jobName))'Submitted a job with id: %s', response.jobId) # Return the result object that references the manager instance result = PythonRPCDirectPipelineResult(response.jobId, manager) return result except grpc.RpcError: logging.error('Failed to run the job with name: %s', jobName) raise
class PythonRPCDirectPipelineResult(PipelineResult): """Represents the state of a pipeline run on the Dataflow service.""" def __init__(self, job_id, job_manager): self.job_id = job_id self.manager = job_manager @property def state(self): return self.manager.service.getState( beam_job_api_pb2.GetJobStateRequest(jobId=self.job_id)) def wait_until_finish(self): messages_request = beam_job_api_pb2.JobMessagesRequest(jobId=self.job_id) for message in self.manager.service.getMessageStream(messages_request): if message.HasField('stateResponse'): 'Current state of job: %s', beam_job_api_pb2.JobState.Enum.Name( message.stateResponse.state)) else:'Message %s', message.messageResponse)'Job with id: %s in terminal state now.', self.job_id) def cancel(self): return self.manager.service.cancel( beam_job_api_pb2.CancelJobRequest(jobId=self.job_id)) def metrics(self): raise NotImplementedError