@Experimental public final class SketchFrequencies extends java.lang.Object
PTransform
s to compute the estimate frequency of each element in a stream.
This class uses the Count-min Sketch structure which allows very efficient queries on the data stream summarization.
The implementation comes from
Addthis' Stream-lib library.
The papers and other useful information about Count-Min Sketch are available on this website.
Two parameters can be tuned in order to control the accuracy of the computation:
epsilon
" controls the accuracy of the estimation.
By default, the relative is around 1%
of the total count.
confidence
",
between 0 and 1 (1 being of course impossible). These two parameters will determine the size of the Count-min sketch, which is a two-dimensional array with depth and width defined as follows :
width = ceil(2 / epsilon)
depth = ceil(-log(1 - confidence) / log(2))
With the default values, this gives a depth of 200 and a width of 10.
WARNING: The relative error concerns the total number of distinct elements
in a stream. Thus, an element having 1000 occurrences in a stream of 1 million distinct
elements will have 1% of 1 million as relative error, i.e. 10 000. This means the frequency
is 1000 +/- 10 000 for this element. Therefore this is obvious that the relative error must
be really low in very large streams.
Also keep in mind that this algorithm works well on highly skewed data but gives poor
results if the elements are evenly distributed.
There are 2 ways of using this class:
PTransform
s that return a PCollection
singleton that contains
a Count-min sketch for querying the estimate number of hits of the elements.
SketchFrequencies.CountMinSketchFn
CombineFn
that is exposed in order to make
advanced processing involving the Count-Min sketch.
The simplest use is simply to call the globally()
or perKey()
method in
order to retrieve the sketch with an estimate number of hits for each element in the stream.
PCollection<MyObject> pc = ...;
PCollection<CountMinSketch> countMinSketch = pc.apply(SketchFrequencies
.<MyObject>globally()); // .<MyObject>perKey();
}
One can tune the epsilon
and confidence
parameters in order to
control accuracy and memory.
The tuning works exactly the same for globally()
and perKey()
.
double eps = 0.001;
double conf = 0.9999;
PCollection<MyObject> pc = ...;
PCollection<CountMinSketch> countMinSketch = pc.apply(SketchFrequencies
.<MyObject>globally() // .<MyObject>perKey()
.withRelativeError(eps)
.withConfidence(conf));
}
This example shows how to query the resulting SketchFrequencies.Sketch
.
To estimate the number of hits of an element, one has to use
SketchFrequencies.Sketch.estimateCount(Object, Coder)
method and to provide
the coder for the element type.
For instance, one can build a KV Pair linking each element to an estimation
of its frequency, using the sketch as side input of a ParDo
.
PCollection<MyObject> pc = ...;
PCollection<CountMinSketch> countMinSketch = pc.apply(SketchFrequencies
.<MyObject>globally());
// Retrieve the coder for MyObject
final Coder<MyObject> = pc.getCoder();
// build a View of the sketch so it can be passed a sideInput
final PCollectionView<CountMinSketch> sketchView = sketch.apply(View
.<CountMinSketch>asSingleton());
PCollection<KV<MyObject, Long>> pairs = pc.apply(ParDo.of(
new DoFn<Long, KV<MyObject, Long>>() {
@ProcessElement
public void procesElement(ProcessContext c) {
Long elem = c.element();
CountMinSketch sketch = c.sideInput(sketchView);
sketch.estimateCount(elem, coder);
}}).withSideInputs(sketchView));
}
The CombineFn
does the same thing as the PTransform
s but
it can be used for doing stateful processing or in
CombineFns.ComposedCombineFn
.
This example is not really interesting but it shows how you can properly create
a SketchFrequencies.CountMinSketchFn
. One must always specify a coder using the SketchFrequencies.CountMinSketchFn.create(Coder)
method.
double eps = 0.0001;
double conf = 0.9999;
PCollection<MyObject> input = ...;
PCollection<CountMinSketch> output = input.apply(Combine.globally(CountMinSketchFn
.<MyObject>create(new MyObjectCoder())
.withAccuracy(eps, conf)));
}
Warning: this class is experimental.
Its API is subject to change in future versions of Beam.
Modifier and Type | Class and Description |
---|---|
static class |
SketchFrequencies.CountMinSketchFn<InputT>
Implements the
Combine.CombineFn of SketchFrequencies transforms. |
static class |
SketchFrequencies.GlobalSketch<InputT>
Implementation of
globally() . |
static class |
SketchFrequencies.PerKeySketch<K,V>
Implementation of
perKey() . |
static class |
SketchFrequencies.Sketch<T>
Wrap StreamLib's Count-Min Sketch to support counting all user types by hashing
the encoded user type using the supplied deterministic coder.
|
Constructor and Description |
---|
SketchFrequencies() |
Modifier and Type | Method and Description |
---|---|
static <InputT> SketchFrequencies.GlobalSketch<InputT> |
globally()
Create the
PTransform that will build a Count-min sketch for keeping track
of the frequency of the elements in the whole stream. |
static <K,V> SketchFrequencies.PerKeySketch<K,V> |
perKey()
Like
globally() but per key, i.e a Count-min sketch per key
in PCollection<KV<K, V>> and returns a
PCollection<KV<K, {@link CountMinSketch}>> . |
public static <InputT> SketchFrequencies.GlobalSketch<InputT> globally()
PTransform
that will build a Count-min sketch for keeping track
of the frequency of the elements in the whole stream.
It returns a PCollection<{@link CountMinSketch}>
that can be queried in order to
obtain estimations of the elements' frequencies.
InputT
- the type of the elements in the input PCollection
public static <K,V> SketchFrequencies.PerKeySketch<K,V> perKey()
globally()
but per key, i.e a Count-min sketch per key
in PCollection<KV<K, V>>
and returns a
PCollection<KV<K, {@link CountMinSketch}>>
.K
- type of the keys mapping the elementsV
- type of the values being combined per key