Class View
PCollectionViews from PCollections (to read them as side inputs).
While a PCollection<ElemT> has many values of type ElemT per
window, a PCollectionView<ViewT> has a single value of type
ViewT for each window. It can be thought of as a mapping from windows to values of type
ViewT. The transforms here represent ways of converting the ElemT values in a window
into a ViewT for that window.
When a ParDo transform is processing a main input element in a window w and a
PCollectionView is read via DoFn.ProcessContext.sideInput(org.apache.beam.sdk.values.PCollectionView<T>), the value of the view
for w is returned.
The SDK supports viewing a PCollection, per window, as a single value, a List,
an Iterable, a Map, or a multimap (iterable-valued Map).
For a PCollection that contains a single value of type T per window, such as
the output of Combine.globally(org.apache.beam.sdk.transforms.SerializableFunction<java.lang.Iterable<V>, V>), use asSingleton() to prepare it for use as a
side input:
PCollectionView<T> output = someOtherPCollection
.apply(Combine.globally(...))
.apply(View.<T>asSingleton());
For a small PCollection with windows that can fit entirely in memory, use asList() to prepare it for use as a List. When read as a side input, the entire
list for a window will be cached in memory.
PCollectionView<List<T>> output =
smallPCollection.apply(View.<T>asList());
If a PCollection of KV<K, V> is known to have a single value per window for
each key, then use asMap() to view it as a Map<K, V>:
PCollectionView<Map<K, V> output =
somePCollection.apply(View.<K, V>asMap());
Otherwise, to access a PCollection of KV<K, V> as a Map<K,
Iterable<V>> side input, use asMultimap():
PCollectionView<Map<K, Iterable<V>> output =
somePCollection.apply(View.<K, Iterable<V>>asMultimap());
To iterate over an entire window of a PCollection via side input, use asIterable():
PCollectionView<Iterable<T>> output =
somePCollection.apply(View.<T>asIterable());
Both asMultimap() and asMap() are useful for implementing lookup
based "joins" with the main input, when the side input is small enough to fit into memory.
For example, if you represent a page on a website via some Page object and have some
type UrlVisits logging that a URL was visited, you could convert these to more fully
structured PageVisit objects using a side input, something like the following:
PCollection<Page> pages = ... // pages fit into memory PCollection<UrlVisit> urlVisits = ... // very large collection final PCollectionView<Map<URL, Page>> urlToPageView = pages .apply(WithKeys.of( ... )) // extract the URL from the page .apply(View.<URL, Page>asMap()); PCollection<PageVisit> pageVisits = urlVisits .apply(ParDo.withSideInputs(urlToPageView) .of(new DoFn<UrlVisit, PageVisit>(){@Override void processElement(ProcessContext context) { UrlVisit urlVisit = context.element(); Map<URL, Page> urlToPage = context.sideInput(urlToPageView); Page page = urlToPage.get(urlVisit.getUrl()); c.output(new PageVisit(page, urlVisit.getVisitData())); }}));
See ParDo.SingleOutput.withSideInputs(org.apache.beam.sdk.values.PCollectionView<?>...) for details on how to access this variable
inside a ParDo over another PCollection.
-
Nested Class Summary
Nested ClassesModifier and TypeClassDescriptionstatic classFor internal use only; no backwards-compatibility guarantees.static classFor internal use only; no backwards-compatibility guarantees.static classView.AsMap<K,V> For internal use only; no backwards-compatibility guarantees.static classFor internal use only; no backwards-compatibility guarantees.static classFor internal use only; no backwards-compatibility guarantees.static classFor internal use only; no backwards-compatibility guarantees.static classProvides an index to value mapping using a random starting index and also provides an offset range for each window seen. -
Method Summary
Modifier and TypeMethodDescriptionstatic <T> View.AsIterable<T> Returns aView.AsIterabletransform that takes aPCollectionas input and produces aPCollectionViewmapping each window to anIterableof the values in that window.static <T> View.AsList<T> asList()Returns aView.AsListtransform that takes aPCollectionand returns aPCollectionViewmapping each window to aListcontaining all of the elements in the window.static <K,V> View.AsMap <K, V> asMap()Returns aView.AsMaptransform that takes aPCollection<KV<K, V>>as input and produces aPCollectionViewmapping each window to aMap<K, V>.static <K,V> View.AsMultimap <K, V> Returns aView.AsMultimaptransform that takes aPCollection<KV<K, V>>as input and produces aPCollectionViewmapping each window to its contents as aMap<K, Iterable<V>>for use as a side input.static <T> View.AsSingleton<T> Returns aView.AsSingletontransform that takes aPCollectionwith a single value per window as input and produces aPCollectionViewthat returns the value in the main input window when read as a side input.
-
Method Details
-
asSingleton
Returns aView.AsSingletontransform that takes aPCollectionwith a single value per window as input and produces aPCollectionViewthat returns the value in the main input window when read as a side input.PCollection<InputT> input = ... CombineFn<InputT, OutputT> yourCombineFn = ... PCollectionView<OutputT> output = input .apply(Combine.globally(yourCombineFn)) .apply(View.<OutputT>asSingleton());If the input
PCollectionis empty, throwsNoSuchElementExceptionin the consumingDoFn.If the input
PCollectioncontains more than one element, throwsIllegalArgumentExceptionin the consumingDoFn. -
asList
Returns aView.AsListtransform that takes aPCollectionand returns aPCollectionViewmapping each window to aListcontaining all of the elements in the window.This view should only be used if random access and/or size of the PCollection is required.
asIterable()will perform significantly better for sequential access.Some runners may require that the view fits in memory.
-
asIterable
Returns aView.AsIterabletransform that takes aPCollectionas input and produces aPCollectionViewmapping each window to anIterableof the values in that window.Some runners may require that the view fits in memory.
-
asMap
Returns aView.AsMaptransform that takes aPCollection<KV<K, V>>as input and produces aPCollectionViewmapping each window to aMap<K, V>. It is required that each key of the input be associated with a single value, per window. If this is not the case, precede this view withCombine.perKey, as in the example below, or alternatively useasMultimap().PCollection<KV<K, V>> input = ... CombineFn<V, OutputT> yourCombineFn = ... PCollectionView<Map<K, OutputT>> output = input .apply(Combine.perKey(yourCombineFn)) .apply(View.<K, OutputT>asMap());Some runners may require that the view fits in memory.
-
asMultimap
Returns aView.AsMultimaptransform that takes aPCollection<KV<K, V>>as input and produces aPCollectionViewmapping each window to its contents as aMap<K, Iterable<V>>for use as a side input. In contrast toasMap(), it is not required that the keys in the input collection be unique.PCollection<KV<K, V>> input = ... // maybe more than one occurrence of a some keys PCollectionView<Map<K, Iterable<V>>> output = input.apply(View.<K, V>asMultimap());Some runners may require that the view fits in memory.
-