#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
"""A runner implementation that submits a job for remote execution.
"""
import time
import uuid
from concurrent import futures
import grpc
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.pipeline import Pipeline
from apache_beam.portability.api import beam_job_api_pb2
from apache_beam.portability.api import beam_job_api_pb2_grpc
from apache_beam.runners.runner import PipelineState
_ONE_DAY_IN_SECONDS = 60 * 60 * 24
[docs]class JobService(beam_job_api_pb2_grpc.JobServiceServicer):
  def __init__(self):
    self.jobs = {}
[docs]  def run(self, request, context):
    job_id = uuid.uuid4().get_hex()
    pipeline_result = Pipeline.from_runner_api(
        request.pipeline,
        'DirectRunner',
        PipelineOptions()).run()
    self.jobs[job_id] = pipeline_result
    return beam_job_api_pb2.SubmitJobResponse(jobId=job_id) 
[docs]  def getState(self, request, context):
    pipeline_result = self.jobs[request.jobId]
    return beam_job_api_pb2.GetJobStateResponse(
        state=self._map_state_to_jobState(pipeline_result.state)) 
[docs]  def cancel(self, request, context):
    pipeline_result = self.jobs[request.jobId]
    pipeline_result.cancel()
    return beam_job_api_pb2.CancelJobResponse(
        state=self._map_state_to_jobState(pipeline_result.state)) 
[docs]  def getMessageStream(self, request, context):
    pipeline_result = self.jobs[request.jobId]
    pipeline_result.wait_until_finish()
    yield beam_job_api_pb2.JobMessagesResponse(
        stateResponse=beam_job_api_pb2.GetJobStateResponse(
            state=self._map_state_to_jobState(pipeline_result.state))) 
[docs]  def getStateStream(self, request, context):
    context.set_details('Not Implemented for direct runner!')
    context.set_code(grpc.StatusCode.UNIMPLEMENTED)
    return 
  @staticmethod
  def _map_state_to_jobState(state):
    if state == PipelineState.UNKNOWN:
      return beam_job_api_pb2.JobState.UNSPECIFIED
    elif state == PipelineState.STOPPED:
      return beam_job_api_pb2.JobState.STOPPED
    elif state == PipelineState.RUNNING:
      return beam_job_api_pb2.JobState.RUNNING
    elif state == PipelineState.DONE:
      return beam_job_api_pb2.JobState.DONE
    elif state == PipelineState.FAILED:
      return beam_job_api_pb2.JobState.FAILED
    elif state == PipelineState.CANCELLED:
      return beam_job_api_pb2.JobState.CANCELLED
    elif state == PipelineState.UPDATED:
      return beam_job_api_pb2.JobState.UPDATED
    elif state == PipelineState.DRAINING:
      return beam_job_api_pb2.JobState.DRAINING
    elif state == PipelineState.DRAINED:
      return beam_job_api_pb2.JobState.DRAINED
    else:
      raise ValueError('Unknown pipeline state') 
[docs]def serve():
  server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
  beam_job_api_pb2_grpc.add_JobServiceServicer_to_server(JobService(), server)
  server.add_insecure_port('[::]:50051')
  server.start()
  try:
    while True:
      time.sleep(_ONE_DAY_IN_SECONDS)
  except KeyboardInterrupt:
    server.stop(0) 
if __name__ == '__main__':
  serve()