Class SketchFrequencies
PTransform
s to compute the estimate frequency of each element in a stream.
This class uses the Count-min Sketch structure which allows very efficient queries on the data stream summarization.
References
The implementation comes from Addthis'
Stream-lib library.
The papers and other useful information about Count-Min Sketch are available on this website.
Parameters
Two parameters can be tuned in order to control the accuracy of the computation:
- Relative Error:
The relative error "epsilon
" controls the accuracy of the estimation. By default, the relative is around1%
of the total count. - Confidence
The relative error can be guaranteed only with a certain "confidence
", between 0 and 1 (1 being of course impossible).
The default value is set to 0.999 meaning that we can guarantee that the relative error will not exceed 1% of the total count in 99.9% of cases.
These two parameters will determine the size of the Count-min sketch, which is a two-dimensional array with depth and width defined as follows :
width = ceil(2 / epsilon)
depth = ceil(-log(1 - confidence) / log(2))
With the default values, this gives a depth of 200 and a width of 10.
WARNING: The relative error concerns the total number of distinct elements in a stream.
Thus, an element having 1000 occurrences in a stream of 1 million distinct elements will have 1%
of 1 million as relative error, i.e. 10 000. This means the frequency is 1000 +/- 10 000 for this
element. Therefore this is obvious that the relative error must be really low in very large
streams.
Also keep in mind that this algorithm works well on highly skewed data but gives poor results if
the elements are evenly distributed.
Examples
There are 2 ways of using this class:
- Use the
PTransform
s that return aPCollection
singleton that contains a Count-min sketch for querying the estimate number of hits of the elements. - Use the
SketchFrequencies.CountMinSketchFn
CombineFn
that is exposed in order to make advanced processing involving the Count-Min sketch.
Example 1: default use
The simplest use is to call the globally()
or perKey()
method in order to
retrieve the sketch with an estimate number of hits for each element in the stream.
PCollection<MyObject> pc = ...;
PCollection<CountMinSketch> countMinSketch = pc.apply(SketchFrequencies
.<MyObject>globally()); //.<MyObject>perKey();
Example 2: tune accuracy parameters
One can tune the epsilon
and confidence
parameters in order to control
accuracy and memory.
The tuning works exactly the same for globally()
and perKey()
.
double eps = 0.001;
double conf = 0.9999;
PCollection<MyObject> pc = ...;
PCollection<CountMinSketch> countMinSketch = pc.apply(SketchFrequencies
.<MyObject>globally() //.<MyObject>perKey()
.withRelativeError(eps)
.withConfidence(conf));
Example 3: query the resulting sketch
This example shows how to query the resulting SketchFrequencies.Sketch
. To estimate the number of hits
of an element, one has to use SketchFrequencies.Sketch.estimateCount(Object, Coder)
method and to provide
the coder for the element type.
For instance, one can build a KV Pair linking each element to an estimation of its frequency,
using the sketch as side input of a ParDo
.
PCollection<MyObject> pc = ...;
PCollection<CountMinSketch> countMinSketch = pc.apply(SketchFrequencies
.<MyObject>globally());
// Retrieve the coder for MyObject
finalCoder<MyObject> = pc.getCoder();
// build a View of the sketch so it can be passed a sideInput
finalPCollectionView<CountMinSketch> sketchView = sketch.apply(View
.<CountMinSketch>asSingleton());
PCollection<KV<MyObject, Long>> pairs = pc.apply(ParDo.of(
new DoFn<Long, KV<MyObject, Long>>() {
@ProcessElement
public void processElement(ProcessContext c) {
Long elem = c.element();
CountMinSketch sketch = c.sideInput(sketchView);
c.output(sketch.estimateCount(elem, coder));
}}).withSideInputs(sketchView));
Example 4: Using the CombineFn
The CombineFn
does the same thing as the PTransform
s but it can be used for
doing stateful processing or in CombineFns.ComposedCombineFn
.
This example is not really interesting but it shows how you can properly create a SketchFrequencies.CountMinSketchFn
. One must always specify a coder using the SketchFrequencies.CountMinSketchFn.create(Coder)
method.
double eps = 0.0001;
double conf = 0.9999;
PCollection<MyObject> input = ...;
PCollection<CountMinSketch> output = input.apply(Combine.globally(CountMinSketchFn
.<MyObject>create(new MyObjectCoder())
.withAccuracy(eps, conf)));
-
Nested Class Summary
Nested ClassesModifier and TypeClassDescriptionstatic class
Implements theCombine.CombineFn
ofSketchFrequencies
transforms.static class
Implementation ofglobally()
.static class
Implementation ofperKey()
.static class
Wrap StreamLib's Count-Min Sketch to support counting all user types by hashing the encoded user type using the supplied deterministic coder. -
Constructor Summary
Constructors -
Method Summary
Modifier and TypeMethodDescriptionstatic <InputT> SketchFrequencies.GlobalSketch
<InputT> globally()
Create thePTransform
that will build a Count-min sketch for keeping track of the frequency of the elements in the whole stream.static <K,
V> SketchFrequencies.PerKeySketch <K, V> perKey()
Likeglobally()
but per key, i.e a Count-min sketch per key inPCollection<KV<K, V>>
and returns aPCollection<KV<K, {@link CountMinSketch}>>
.
-
Constructor Details
-
SketchFrequencies
public SketchFrequencies()
-
-
Method Details
-
globally
Create thePTransform
that will build a Count-min sketch for keeping track of the frequency of the elements in the whole stream.It returns a
PCollection<{@link CountMinSketch}>
that can be queried in order to obtain estimations of the elements' frequencies.- Type Parameters:
InputT
- the type of the elements in the inputPCollection
-
perKey
Likeglobally()
but per key, i.e a Count-min sketch per key inPCollection<KV<K, V>>
and returns aPCollection<KV<K, {@link CountMinSketch}>>
.- Type Parameters:
K
- type of the keys mapping the elementsV
- type of the values being combined per key
-