Source code for apache_beam.runners.experimental.python_rpc_direct.server

#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

"""A runner implementation that submits a job for remote execution.
"""
import time
import uuid
from concurrent import futures

import grpc

from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.pipeline import Pipeline
from apache_beam.portability.api import beam_job_api_pb2
from apache_beam.portability.api import beam_job_api_pb2_grpc
from apache_beam.runners.runner import PipelineState

_ONE_DAY_IN_SECONDS = 60 * 60 * 24


[docs]class JobService(beam_job_api_pb2_grpc.JobServiceServicer): def __init__(self): self.jobs = {}
[docs] def run(self, request, context): job_id = uuid.uuid4().get_hex() pipeline_result = Pipeline.from_runner_api( request.pipeline, 'DirectRunner', PipelineOptions()).run() self.jobs[job_id] = pipeline_result return beam_job_api_pb2.SubmitJobResponse(jobId=job_id)
[docs] def getState(self, request, context): pipeline_result = self.jobs[request.jobId] return beam_job_api_pb2.GetJobStateResponse( state=self._map_state_to_jobState(pipeline_result.state))
[docs] def cancel(self, request, context): pipeline_result = self.jobs[request.jobId] pipeline_result.cancel() return beam_job_api_pb2.CancelJobResponse( state=self._map_state_to_jobState(pipeline_result.state))
[docs] def getMessageStream(self, request, context): pipeline_result = self.jobs[request.jobId] pipeline_result.wait_until_finish() yield beam_job_api_pb2.JobMessagesResponse( stateResponse=beam_job_api_pb2.GetJobStateResponse( state=self._map_state_to_jobState(pipeline_result.state)))
[docs] def getStateStream(self, request, context): context.set_details('Not Implemented for direct runner!') context.set_code(grpc.StatusCode.UNIMPLEMENTED) return
@staticmethod def _map_state_to_jobState(state): if state == PipelineState.UNKNOWN: return beam_job_api_pb2.JobState.UNSPECIFIED elif state == PipelineState.STOPPED: return beam_job_api_pb2.JobState.STOPPED elif state == PipelineState.RUNNING: return beam_job_api_pb2.JobState.RUNNING elif state == PipelineState.DONE: return beam_job_api_pb2.JobState.DONE elif state == PipelineState.FAILED: return beam_job_api_pb2.JobState.FAILED elif state == PipelineState.CANCELLED: return beam_job_api_pb2.JobState.CANCELLED elif state == PipelineState.UPDATED: return beam_job_api_pb2.JobState.UPDATED elif state == PipelineState.DRAINING: return beam_job_api_pb2.JobState.DRAINING elif state == PipelineState.DRAINED: return beam_job_api_pb2.JobState.DRAINED else: raise ValueError('Unknown pipeline state')
[docs]def serve(): server = grpc.server(futures.ThreadPoolExecutor(max_workers=10)) beam_job_api_pb2_grpc.add_JobServiceServicer_to_server(JobService(), server) server.add_insecure_port('[::]:50051') server.start() try: while True: time.sleep(_ONE_DAY_IN_SECONDS) except KeyboardInterrupt: server.stop(0)
if __name__ == '__main__': serve()