PTransform Style Guide

A style guide for writers of new reusable PTransforms.

Language-neutral considerations

Consistency

Be consistent with prior art:

Exposing a PTransform vs. something else

So you want to develop a library that people will use in their Beam pipelines - a connector to a third-party system, a machine learning algorithm, etc. How should you expose it?

Do:

Do not:

Naming

Do:

Do not:

Configuration

What goes into configuration vs. input collection

What parameters to expose

Do:

Do not:

Error handling

Transform configuration errors

Detect errors early. Errors can be detected at the following stages:

For example:

Runtime errors and data consistency

Favor data consistency above everything else. Do not mask data loss or corruption. If data loss can’t be prevented, fail.

Do:

Do not:

Performance

Many runners optimize chains of ParDos in ways that improve performance if the ParDos emit a small to moderate number of elements per input element, or have relatively cheap per-element processing (e.g. Dataflow’s “fusion”), but limit parallelization if these assumptions are violated. In that case you may need a “fusion break” (Reshuffle.of()) to improve the parallelizability of processing the output PCollection of the ParDo.

Documentation

Document how to configure the transform (give code examples), and what guarantees it expects about its input or provides about its output, accounting for the Beam model. E.g.:

Logging

Anticipate abnormal situations that a user of the transform may run into. Log information that they would have found sufficient for debugging, but limit the volume of logging. Here is some advice that applies to all programs, but is especially important when data volume is massive and execution is distributed.

Do:

Do not:

Testing

Data processing is tricky, full of corner cases, and difficult to debug, because pipelines take a long time to run, it’s hard to check if the output is correct, you can’t attach a debugger, and you often can’t log as much as you wish to, due to high volume of data. Because of that, testing is particularly important.

Testing the transform’s run-time behavior

Testing transform construction and validation

The code for constructing and validating a transform is usually trivial and mostly boilerplate. However, minor mistakes or typos in it can have serious consequences (e.g. ignoring a property that the user has set), so it needs to be tested as well. Yet, an excessive amount of trivial tests can be hard to maintain and give a false impression that the transform is well-tested.

Do:

Do not:

Compatibility

Do:

Do not:

Java specific considerations

Good examples for most of the practices below are JdbcIO and MongoDbIO.

API

Choosing types of input and output PCollection’s

Whenever possible, use types specific to the nature of the transform. People can wrap it with conversion DoFns from their own types if necessary. E.g. a Datastore connector should use the Datastore Entity type, a MongoDb connector should use Mongo Document type, not a String representation of the JSON.

Sometimes that’s not possible (e.g. JDBC does not provide a Beam-compatible (encodable with a Coder) “JDBC record” datatype) - then let the user provide a function for converting between the transform-specific type and a Beam-compatible type (e.g. see JdbcIO and MongoDbGridFSIO).

When the transform should logically return a composite type for which no Java class exists yet, create a new POJO class with well-named fields. Do not use generic tuple classes or KV (unless the fields are legitimately a key and a value).

Transforms with multiple output collections

If the transform needs to return multiple collections, it should be a PTransform<..., PCollectionTuple> and expose methods getBlahTag() for each collection.

E.g. if you want to return a PCollection<Foo> and a PCollection<Bar>, expose TupleTag<Foo> getFooTag() and TupleTag<Bar> getBarTag().

For example:

public class MyTransform extends PTransform<..., PCollectionTuple> {
  private final TupleTag<Moo> mooTag = new TupleTag<Moo>() {};
  private final TupleTag<Blah> blahTag = new TupleTag<Blah>() {};
  ...
  PCollectionTuple expand(... input) {
    ...
    PCollection<Moo> moo = ...;
    PCollection<Blah> blah = ...;
    return PCollectionTuple.of(mooTag, moo)
                           .and(blahTag, blah);
  }

  public TupleTag<Moo> getMooTag() {
    return mooTag;
  }

  public TupleTag<Blah> getBlahTag() {
    return blahTag;
  }
  ...
}

Fluent builders for configuration

Make the transform class immutable, with methods to produce modified immutable objects. Use AutoValue. Autovalue can provide a Builder helper class. Use @Nullable to mark parameters of class type that don’t have a default value or whose default value is null, except for primitive types (e.g. int).

@AutoValue
public abstract static class MyTransform extends PTransform<...> {
  int getMoo();
  @Nullable abstract String getBlah();

  abstract Builder toBuilder();

  @AutoValue.Builder
  abstract static class Builder {
    abstract Builder setMoo(int moo);
    abstract Builder setBlah(String blah);

    abstract MyTransform build();
  }
  ...
}
Factory methods

Provide a single argumentless static factory method, either in the enclosing class (see “Packaging a family of transforms”) or in the transform class itself.

public class Thumbs {
  public static Twiddle twiddle() {
    return new AutoValue_Thumbs_Twiddle.Builder().build();
  }

  public abstract static class Twiddle extends PTransform<...> { ... }
}

// or:
public abstract static class TwiddleThumbs extends PTransform<...> {
  public static TwiddleThumbs create() {
    return new AutoValue_Thumbs_Twiddle.Builder().build();
  }
  ...
}

Exception: when transform has a single overwhelmingly most important parameter, then call the factory method of and put the parameter into an argument of the factory method: ParDo.of(DoFn).withAllowedLateness().

Fluent builder methods for setting parameters

Call them withBlah(). All builder methods must return exactly the same type; if it’s a parameterized (generic) type, with the same values of type parameters.

Treat withBlah() methods as an unordered set of keyword arguments - result must not depend on the order in which you call withFoo() and withBar() (e.g., withBar() must not read the current value of foo).

Document implications of each withBlah method: when to use this method at all, what values are allowed, what is the default, what are the implications of changing the value.

/**
 * Returns a new {@link TwiddleThumbs} transform with moo set
 * to the given value.
 *
 * <p>Valid values are 0 (inclusive) to 100 (exclusive). The default is 42.
 *
 * <p>Higher values generally improve throughput, but increase chance
 * of spontaneous combustion.
 */
public Twiddle withMoo(int moo) {
  checkArgument(moo >= 0 && moo < 100,
      "Thumbs.Twiddle.withMoo() called with an invalid moo of %s. "
      + "Valid values are 0 (inclusive) to 100 (exclusive)",
      moo);
  return toBuilder().setMoo(moo).build();
}
Default values for parameters

Specify them in the factory method (factory method returns an object with default values).

public class Thumbs {
  public static Twiddle twiddle() {
    return new AutoValue_Thumbs_Twiddle.Builder().setMoo(42).build();
  }
  ...
}
Packaging multiple parameters into a reusable object

If several parameters of the transform are very tightly logically coupled, sometimes it makes sense to encapsulate them into a container object. Use the same guidelines for this container object (make it immutable, use AutoValue with builders, document withBlah() methods, etc.). For an example, see JdbcIO.DataSourceConfiguration.

Transforms with type parameters

All type parameters should be specified explicitly on factory method. Builder methods (withBlah()) should not change the types.

public class Thumbs {
  public static Twiddle<T> twiddle() {
    return new AutoValue_Thumbs_Twiddle.Builder<T>().build();
  }

  @AutoValue
  public abstract static class Twiddle<T>
       extends PTransform<PCollection<Foo>, PCollection<Bar<T>>> {
    
    @Nullable abstract Bar<T> getBar();

    abstract Builder<T> toBuilder();

    @AutoValue.Builder
    abstract static class Builder<T> {
      
      abstract Builder<T> setBar(Bar<T> bar);

      abstract Twiddle<T> build();
    }
    
  }
}

// User code:
Thumbs.Twiddle<String> twiddle = Thumbs.<String>twiddle();
// Or:
PCollection<Bar<String>> bars = foos.apply(Thumbs.<String>twiddle()  );

Exception: when the transform has a single most important parameter and this parameter depends on type T, then prefer to put it right into the factory method: e.g. Combine.globally(SerializableFunction<Iterable<V>,V>). This improves Java’s type inference and allows the user not to specify type parameters explicitly.

When the transform has more than one type parameter, or if the meaning of the parameter is non-obvious, name the type parameters like SomethingT, e.g.: a PTransform implementing a classifier algorithm and assigning each input element with a label might be typed as Classify<InputT, LabelT>.

Injecting user-specified behavior

If the transform has an aspect of behavior to be customized by a user’s code, make a decision as follows:

Do:

Do not:

Packaging a family of transforms

When developing a family of highly related transforms (e.g. interacting with the same system in different ways, or providing different implementations of the same high-level task), use a top-level class as a namespace, with multiple factory methods returning transforms corresponding to each individual use case.

The container class must have a private constructor, so it can’t be instantiated directly.

Document common stuff at FooIO level, and each factory method individually.

/** Transforms for clustering data. */
public class Cluster {
  // Force use of static factory methods.
  private Cluster() {}

  /** Returns a new {@link UsingKMeans} transform. */
  public static UsingKMeans usingKMeans() { ... }
  public static Hierarchically hierarchically() { ... }

  /** Clusters data using the K-Means algorithm. */
  public static class UsingKMeans extends PTransform<...> { ... }
  public static class Hierarchically extends PTransform<...> { ... }
}

public class FooIO {
  // Force use of static factory methods.
  private FooIO() {}

  public static Read read() { ... }
  ...

  public static class Read extends PTransform<...> { ... }
  public static class Write extends PTransform<...> { ... }
  public static class Delete extends PTransform<...> { ... }
  public static class Mutate extends PTransform<...> { ... }
}

When supporting multiple versions with incompatible APIs, use the version as a namespace-like class too, and put implementations of different API versions in different files.

// FooIO.java
public class FooIO {
  // Force use of static factory methods.
  private FooIO() {}

  public static FooV1 v1() { return new FooV1(); }
  public static FooV2 v2() { return new FooV2(); }
}

// FooV1.java
public class FooV1 {
  // Force use of static factory methods outside the package.
  FooV1() {}
  public static Read read() { ... }
  public static class Read extends PTransform<...> { ... }
}

// FooV2.java
public static class FooV2 {
  // Force use of static factory methods outside the package.
  FooV2() {}
  public static Read read() { ... }

  public static class Read extends PTransform<...> { ... }
}

Behavior

Immutability

Serialization

DoFn, PTransform, CombineFn and other instances will be serialized. Keep the amount of serialized data to a minimum: Mark fields that you don’t want serialized as transient. Make classes static whenever possible (so that the instance doesn’t capture and serialize the enclosing class instance). Note: In some cases this means that you cannot use anonymous classes.

Validation

@AutoValue
public abstract class TwiddleThumbs
    extends PTransform<PCollection<Foo>, PCollection<Bar>> {
  abstract int getMoo();
  abstract String getBoo();

  ...
  // Validating individual parameters
  public TwiddleThumbs withMoo(int moo) {
    checkArgument(
        moo >= 0 && moo < 100,
        "Moo must be between 0 (inclusive) and 100 (exclusive), but was: %s",
        moo);
    return toBuilder().setMoo(moo).build();
  }

  public TwiddleThumbs withBoo(String boo) {
    checkArgument(boo != null, "Boo can not be null");
    checkArgument(!boo.isEmpty(), "Boo can not be empty");
    return toBuilder().setBoo(boo).build();
  }

  @Override
  public void validate(PipelineOptions options) {
    int woo = options.as(TwiddleThumbsOptions.class).getWoo();
    checkArgument(
       woo > getMoo(),
      "Woo (%s) must be smaller than moo (%s)",
      woo, getMoo());
  }

  @Override
  public PCollection<Bar> expand(PCollection<Foo> input) {
    // Validating that a required parameter is present
    checkArgument(getBoo() != null, "Must specify boo");

    // Validating a combination of parameters
    checkArgument(
        getMoo() == 0 || getBoo() == null,
        "Must specify at most one of moo or boo, but was: moo = %s, boo = %s",
        getMoo(), getBoo());

    ...
  }
}

Coders

Coders are a way for a Beam runner to materialize intermediate data or transmit it between workers when necessary. Coder should not be used as a general-purpose API for parsing or writing binary formats because the particular binary encoding of a Coder is intended to be its private implementation detail.

Providing default coders for types

Provide default Coders for all new data types. Use @DefaultCoder annotations or CoderProviderRegistrar classes annotated with @AutoService: see usages of these classes in the SDK for examples. If performance is not important, you can use SerializableCoder or AvroCoder. Otherwise, develop an efficient custom coder (subclass AtomicCoder for concrete types, StructuredCoder for generic types).

Setting coders on output collections

All PCollections created by your PTransform (both output and intermediate collections) must have a Coder set on them: a user should never need to call .setCoder() to “fix up” a coder on a PCollection produced by your PTransform (in fact, Beam intends to eventually deprecate setCoder). In some cases, coder inference will be sufficient to achieve this; in other cases, your transform will need to explicitly call setCoder on its collections.

If the collection is of a concrete type, that type usually has a corresponding coder. Use a specific most efficient coder (e.g. StringUtf8Coder.of() for strings, ByteArrayCoder.of() for byte arrays, etc.), rather than a general-purpose coder like SerializableCoder.

If the type of the collection involves generic type variables, the situation is more complex: