Class View
PCollectionViews
from PCollections
(to read them as side inputs).
While a PCollection<ElemT>
has many values of type ElemT
per
window, a PCollectionView<ViewT>
has a single value of type
ViewT
for each window. It can be thought of as a mapping from windows to values of type
ViewT
. The transforms here represent ways of converting the ElemT
values in a window
into a ViewT
for that window.
When a ParDo
transform is processing a main input element in a window w
and a
PCollectionView
is read via DoFn.ProcessContext.sideInput(org.apache.beam.sdk.values.PCollectionView<T>)
, the value of the view
for w
is returned.
The SDK supports viewing a PCollection
, per window, as a single value, a List
,
an Iterable
, a Map
, or a multimap (iterable-valued Map
).
For a PCollection
that contains a single value of type T
per window, such as
the output of Combine.globally(org.apache.beam.sdk.transforms.SerializableFunction<java.lang.Iterable<V>, V>)
, use asSingleton()
to prepare it for use as a
side input:
PCollectionView<T> output = someOtherPCollection
.apply(Combine.globally(...))
.apply(View.<T>asSingleton());
For a small PCollection
with windows that can fit entirely in memory, use asList()
to prepare it for use as a List
. When read as a side input, the entire
list for a window will be cached in memory.
PCollectionView<List<T>> output =
smallPCollection.apply(View.<T>asList());
If a PCollection
of KV<K, V>
is known to have a single value per window for
each key, then use asMap()
to view it as a Map<K, V>
:
PCollectionView<Map<K, V> output =
somePCollection.apply(View.<K, V>asMap());
Otherwise, to access a PCollection
of KV<K, V>
as a Map<K,
Iterable<V>>
side input, use asMultimap()
:
PCollectionView<Map<K, Iterable<V>> output =
somePCollection.apply(View.<K, Iterable<V>>asMultimap());
To iterate over an entire window of a PCollection
via side input, use asIterable()
:
PCollectionView<Iterable<T>> output =
somePCollection.apply(View.<T>asIterable());
Both asMultimap()
and asMap()
are useful for implementing lookup
based "joins" with the main input, when the side input is small enough to fit into memory.
For example, if you represent a page on a website via some Page
object and have some
type UrlVisits
logging that a URL was visited, you could convert these to more fully
structured PageVisit
objects using a side input, something like the following:
PCollection<Page> pages = ... // pages fit into memory PCollection<UrlVisit> urlVisits = ... // very large collection final PCollectionView<Map<URL, Page>> urlToPageView = pages .apply(WithKeys.of( ... )) // extract the URL from the page .apply(View.<URL, Page>asMap()); PCollection<PageVisit> pageVisits = urlVisits .apply(ParDo.withSideInputs(urlToPageView) .of(new DoFn<UrlVisit, PageVisit>()
{@Override void processElement(ProcessContext context) { UrlVisit urlVisit = context.element(); Map<URL, Page> urlToPage = context.sideInput(urlToPageView); Page page = urlToPage.get(urlVisit.getUrl()); c.output(new PageVisit(page, urlVisit.getVisitData())); }
}));
See ParDo.SingleOutput.withSideInputs(org.apache.beam.sdk.values.PCollectionView<?>...)
for details on how to access this variable
inside a ParDo
over another PCollection
.
-
Nested Class Summary
Nested ClassesModifier and TypeClassDescriptionstatic class
For internal use only; no backwards-compatibility guarantees.static class
For internal use only; no backwards-compatibility guarantees.static class
View.AsMap<K,
V> For internal use only; no backwards-compatibility guarantees.static class
For internal use only; no backwards-compatibility guarantees.static class
For internal use only; no backwards-compatibility guarantees.static class
For internal use only; no backwards-compatibility guarantees.static class
Provides an index to value mapping using a random starting index and also provides an offset range for each window seen. -
Method Summary
Modifier and TypeMethodDescriptionstatic <T> View.AsIterable
<T> Returns aView.AsIterable
transform that takes aPCollection
as input and produces aPCollectionView
mapping each window to anIterable
of the values in that window.static <T> View.AsList
<T> asList()
Returns aView.AsList
transform that takes aPCollection
and returns aPCollectionView
mapping each window to aList
containing all of the elements in the window.static <K,
V> View.AsMap <K, V> asMap()
Returns aView.AsMap
transform that takes aPCollection<KV<K, V>>
as input and produces aPCollectionView
mapping each window to aMap<K, V>
.static <K,
V> View.AsMultimap <K, V> Returns aView.AsMultimap
transform that takes aPCollection<KV<K, V>>
as input and produces aPCollectionView
mapping each window to its contents as aMap<K, Iterable<V>>
for use as a side input.static <T> View.AsSingleton
<T> Returns aView.AsSingleton
transform that takes aPCollection
with a single value per window as input and produces aPCollectionView
that returns the value in the main input window when read as a side input.
-
Method Details
-
asSingleton
Returns aView.AsSingleton
transform that takes aPCollection
with a single value per window as input and produces aPCollectionView
that returns the value in the main input window when read as a side input.PCollection<InputT> input = ... CombineFn<InputT, OutputT> yourCombineFn = ... PCollectionView<OutputT> output = input .apply(Combine.globally(yourCombineFn)) .apply(View.<OutputT>asSingleton());
If the input
PCollection
is empty, throwsNoSuchElementException
in the consumingDoFn
.If the input
PCollection
contains more than one element, throwsIllegalArgumentException
in the consumingDoFn
. -
asList
Returns aView.AsList
transform that takes aPCollection
and returns aPCollectionView
mapping each window to aList
containing all of the elements in the window.This view should only be used if random access and/or size of the PCollection is required.
asIterable()
will perform significantly better for sequential access.Some runners may require that the view fits in memory.
-
asIterable
Returns aView.AsIterable
transform that takes aPCollection
as input and produces aPCollectionView
mapping each window to anIterable
of the values in that window.Some runners may require that the view fits in memory.
-
asMap
Returns aView.AsMap
transform that takes aPCollection<KV<K, V>>
as input and produces aPCollectionView
mapping each window to aMap<K, V>
. It is required that each key of the input be associated with a single value, per window. If this is not the case, precede this view withCombine.perKey
, as in the example below, or alternatively useasMultimap()
.PCollection<KV<K, V>> input = ... CombineFn<V, OutputT> yourCombineFn = ... PCollectionView<Map<K, OutputT>> output = input .apply(Combine.perKey(yourCombineFn)) .apply(View.<K, OutputT>asMap());
Some runners may require that the view fits in memory.
-
asMultimap
Returns aView.AsMultimap
transform that takes aPCollection<KV<K, V>>
as input and produces aPCollectionView
mapping each window to its contents as aMap<K, Iterable<V>>
for use as a side input. In contrast toasMap()
, it is not required that the keys in the input collection be unique.PCollection<KV<K, V>> input = ... // maybe more than one occurrence of a some keys PCollectionView<Map<K, Iterable<V>>> output = input.apply(View.<K, V>asMultimap());
Some runners may require that the view fits in memory.
-