Pipeline option patterns

The samples on this page show you common pipeline configurations. For more information about pipeline configuration options, see Creating a pipeline and Configuring pipeline options.

Retroactively logging runtime parameters

Use the ValueProvider interface to access runtime parameters after completing a pipeline job.

You can use the ValueProvider interface to pass runtime parameters to your pipeline, but you can only log the parameters from within the the Beam DAG. A solution is to add a pipeline branch with a DoFn that processes a placeholder value and then logs the runtime parameters:


/** Sample of PipelineOptions with a ValueProvider option argument. */
public interface MyOptions extends PipelineOptions {
  @Description("My option")
  @Default.String("Hello world!")
  ValueProvider<String> getStringValue();

  void setStringValue(ValueProvider<String> value);
}

public static void accessingValueProviderInfoAfterRunSnip1(String[] args) {

  MyOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().as(MyOptions.class);

  // Create pipeline.
  Pipeline p = Pipeline.create(options);

  // Add a branch for logging the ValueProvider value.
  p.apply(Create.of(1))
      .apply(
          ParDo.of(
              new DoFn<Integer, Integer>() {

                // Define the DoFn that logs the ValueProvider value.
                @ProcessElement
                public void process(ProcessContext c) {

                  MyOptions ops = c.getPipelineOptions().as(MyOptions.class);
                  // This example logs the ValueProvider value, but you could store it by
                  // pushing it to an external database.

                  LOG.info("Option StringValue was {}", ops.getStringValue());
                }
              }));

  // The main pipeline.
  p.apply(Create.of(1, 2, 3, 4)).apply(Sum.integersGlobally());

  p.run();
}


import logging

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.utils.value_provider import RuntimeValueProvider
from apache_beam.io import WriteToText

class MyOptions(PipelineOptions):
  @classmethod
  def _add_argparse_args(cls, parser):
    parser.add_value_provider_argument('--string_value', type=str)

class LogValueProvidersFn(beam.DoFn):
  def __init__(self, string_vp):
    self.string_vp = string_vp

  # Define the DoFn that logs the ValueProvider value.
  # The DoFn is called when creating the pipeline branch.
  # This example logs the ValueProvider value, but
  # you could store it by pushing it to an external database.
  def process(self, an_int):
    logging.info('The string_value is %s' % self.string_vp.get())
    # Another option (where you don't need to pass the value at all) is:
    logging.info('The string value is %s' %
                 RuntimeValueProvider.get_value('string_value', str, ''))

pipeline_options = PipelineOptions()
# Create pipeline.
p = beam.Pipeline(options=pipeline_options)

my_options = pipeline_options.view_as(MyOptions)
# Add a branch for logging the ValueProvider value.
_ = (p
     | beam.Create([None])
     | 'LogValueProvs' >> beam.ParDo(
         LogValueProvidersFn(my_options.string_value)))

# The main pipeline.
result_pc = (p
             | "main_pc" >> beam.Create([1, 2, 3])
             | beam.combiners.Sum.Globally())

p.run().wait_until_finish()