Class View

java.lang.Object
org.apache.beam.sdk.transforms.View

public class View extends Object
Transforms for creating PCollectionViews from PCollections (to read them as side inputs).

While a PCollection<ElemT> has many values of type ElemT per window, a PCollectionView<ViewT> has a single value of type ViewT for each window. It can be thought of as a mapping from windows to values of type ViewT. The transforms here represent ways of converting the ElemT values in a window into a ViewT for that window.

When a ParDo transform is processing a main input element in a window w and a PCollectionView is read via DoFn.ProcessContext.sideInput(org.apache.beam.sdk.values.PCollectionView<T>), the value of the view for w is returned.

The SDK supports viewing a PCollection, per window, as a single value, a List, an Iterable, a Map, or a multimap (iterable-valued Map).

For a PCollection that contains a single value of type T per window, such as the output of Combine.globally(org.apache.beam.sdk.transforms.SerializableFunction<java.lang.Iterable<V>, V>), use asSingleton() to prepare it for use as a side input:


 PCollectionView<T> output = someOtherPCollection
     .apply(Combine.globally(...))
     .apply(View.<T>asSingleton());
 

For a small PCollection with windows that can fit entirely in memory, use asList() to prepare it for use as a List. When read as a side input, the entire list for a window will be cached in memory.


 PCollectionView<List<T>> output =
    smallPCollection.apply(View.<T>asList());
 

If a PCollection of KV<K, V> is known to have a single value per window for each key, then use asMap() to view it as a Map<K, V>:


 PCollectionView<Map<K, V> output =
     somePCollection.apply(View.<K, V>asMap());
 

Otherwise, to access a PCollection of KV<K, V> as a Map<K, Iterable<V>> side input, use asMultimap():


 PCollectionView<Map<K, Iterable<V>> output =
     somePCollection.apply(View.<K, Iterable<V>>asMultimap());
 

To iterate over an entire window of a PCollection via side input, use asIterable():


 PCollectionView<Iterable<T>> output =
     somePCollection.apply(View.<T>asIterable());
 

Both asMultimap() and asMap() are useful for implementing lookup based "joins" with the main input, when the side input is small enough to fit into memory.

For example, if you represent a page on a website via some Page object and have some type UrlVisits logging that a URL was visited, you could convert these to more fully structured PageVisit objects using a side input, something like the following:


 PCollection<Page> pages = ... // pages fit into memory
 PCollection<UrlVisit> urlVisits = ... // very large collection
 final PCollectionView<Map<URL, Page>> urlToPageView = pages
     .apply(WithKeys.of( ... )) // extract the URL from the page
     .apply(View.<URL, Page>asMap());

 PCollection<PageVisit> pageVisits = urlVisits
     .apply(ParDo.withSideInputs(urlToPageView)
         .of(new DoFn<UrlVisit, PageVisit>() {
            @Override
             void processElement(ProcessContext context) {
                 UrlVisit urlVisit = context.element();
                 Map<URL, Page> urlToPage = context.sideInput(urlToPageView);
                 Page page = urlToPage.get(urlVisit.getUrl());
                 c.output(new PageVisit(page, urlVisit.getVisitData()));
             }
         }));
 

See ParDo.SingleOutput.withSideInputs(org.apache.beam.sdk.values.PCollectionView<?>...) for details on how to access this variable inside a ParDo over another PCollection.

  • Method Details

    • asSingleton

      public static <T> View.AsSingleton<T> asSingleton()
      Returns a View.AsSingleton transform that takes a PCollection with a single value per window as input and produces a PCollectionView that returns the value in the main input window when read as a side input.
      
       PCollection<InputT> input = ...
       CombineFn<InputT, OutputT> yourCombineFn = ...
       PCollectionView<OutputT> output = input
           .apply(Combine.globally(yourCombineFn))
           .apply(View.<OutputT>asSingleton());
       

      If the input PCollection is empty, throws NoSuchElementException in the consuming DoFn.

      If the input PCollection contains more than one element, throws IllegalArgumentException in the consuming DoFn.

    • asList

      public static <T> View.AsList<T> asList()
      Returns a View.AsList transform that takes a PCollection and returns a PCollectionView mapping each window to a List containing all of the elements in the window.

      This view should only be used if random access and/or size of the PCollection is required. asIterable() will perform significantly better for sequential access.

      Some runners may require that the view fits in memory.

    • asIterable

      public static <T> View.AsIterable<T> asIterable()
      Returns a View.AsIterable transform that takes a PCollection as input and produces a PCollectionView mapping each window to an Iterable of the values in that window.

      Some runners may require that the view fits in memory.

    • asMap

      public static <K, V> View.AsMap<K,V> asMap()
      Returns a View.AsMap transform that takes a PCollection<KV<K, V>> as input and produces a PCollectionView mapping each window to a Map<K, V>. It is required that each key of the input be associated with a single value, per window. If this is not the case, precede this view with Combine.perKey, as in the example below, or alternatively use asMultimap().
      
       PCollection<KV<K, V>> input = ...
       CombineFn<V, OutputT> yourCombineFn = ...
       PCollectionView<Map<K, OutputT>> output = input
           .apply(Combine.perKey(yourCombineFn))
           .apply(View.<K, OutputT>asMap());
       

      Some runners may require that the view fits in memory.

    • asMultimap

      public static <K, V> View.AsMultimap<K,V> asMultimap()
      Returns a View.AsMultimap transform that takes a PCollection<KV<K, V>> as input and produces a PCollectionView mapping each window to its contents as a Map<K, Iterable<V>> for use as a side input. In contrast to asMap(), it is not required that the keys in the input collection be unique.
      
       PCollection<KV<K, V>> input = ... // maybe more than one occurrence of a some keys
       PCollectionView<Map<K, Iterable<V>>> output = input.apply(View.<K, V>asMultimap());
       

      Some runners may require that the view fits in memory.