@Experimental public final class ApproximateDistinct extends java.lang.Object
PTransforms for computing the approximate number of distinct elements in a stream.
This class relies on the HyperLogLog algorithm, and more precisely HyperLogLog+, the improved version of Google.
The implementation comes from Addthis'
Stream-lib library.
The original paper of the HyperLogLog is available here.
A paper from the same authors to have a clearer view of the algorithm is available here.
Google's HyperLogLog+ version is detailed in this paper.
Two parameters can be tuned in order to control the computation's accuracy:
p 1.1 / sqrt(2^p). The value
should be of at least 4 to guarantee a minimal accuracy. 12 for a relative error of around 2%.
sp sp should be greater than p, but lower than 32. sp = 0). One should use it if
the cardinality may be less than 12000.
There are 2 ways of using this class:
PTransforms that return PCollection<Long> corresponding to the
estimate number of distinct elements in the input PCollection of objects or for
each key in a PCollection of KVs.
ApproximateDistinct.ApproximateDistinctFn CombineFn that is exposed in order to make
advanced processing involving the HyperLogLogPlus structure which resumes the
stream.
PCollection<Integer> input = ...;
PCollection<Long> hllSketch = input.apply(ApproximateDistinct.<Integer>globally());
PCollection<Integer, String> input = ...;
PCollection<Integer, Long> hllSketches = input.apply(ApproximateDistinct
.<Integer, String>perKey());
One can tune the precision and sparse precision parameters in order to control the accuracy
and the memory. The tuning works exactly the same for globally() and perKey().
int precision = 15;
int sparsePrecision = 25;
PCollection<Double> input = ...;
PCollection<Long> hllSketch = input.apply(ApproximateDistinct
.<Double>globally()
.withPrecision(precision)
.withSparsePrecision(sparsePrecision));
ApproximateDistinct.ApproximateDistinctFn CombineFnThe CombineFn does the same thing as the transform but it can be used in cases where you want
to manipulate the HyperLogLogPlus sketch, for example if you want to store it in a
database to have a backup. It can also be used in stateful processing or in CombineFns.ComposedCombineFn.
This example is not really interesting but show how you can properly create an ApproximateDistinct.ApproximateDistinctFn. One must always specify a coder using the ApproximateDistinct.ApproximateDistinctFn.create(Coder) method.
PCollection<Integer> input = ...;
PCollection<HyperLogLogPlus> output = input.apply(Combine.globally(ApproximateDistinctFn
.<Integer>create(BigEndianIntegerCoder.of()));
Combine.CombineFn in a stateful ParDoOne may want to use the ApproximateDistinct.ApproximateDistinctFn in a stateful ParDo in order to make
some processing depending on the current cardinality of the stream.
For more information about stateful processing see the blog spot on this topic here.
Here is an example of DoFn using an ApproximateDistinct.ApproximateDistinctFn as a CombiningState:
class StatefulCardinality<V> extends DoFn<V, OutputT> {
@StateId("hyperloglog")
private final StateSpec<CombiningState<V, HyperLogLogPlus, HyperLogLogPlus>>
indexSpec;
public StatefulCardinality(ApproximateDistinctFn<V> fn) {
indexSpec = StateSpecs.combining(fn);
}
@ProcessElement
public void processElement(
ProcessContext context,
@StateId("hllSketch")
CombiningState<V, HyperLogLogPlus, HyperLogLogPlus> hllSketch) {
long current = MoreObjects.firstNonNull(hllSketch.getAccum().cardinality(), 0L);
hllSketch.add(context.element());
context.output(...);
}
}
Then the DoFn can be called like this:
PCollection<V> input = ...;
ApproximateDistinctFn<V> myFn = ApproximateDistinctFn.create(input.getCoder());
PCollection<V> = input.apply(ParDo.of(new StatefulCardinality<>(myFn)));
RetrieveCardinality utility classOne may want to retrieve the cardinality as a long after making some advanced processing using
the HyperLogLogPlus structure.
The RetrieveCardinality utility class provides an easy way to do so:
PCollection<MyObject> input = ...;
PCollection<HyperLogLogPlus> hll = input.apply(Combine.globally(ApproximateDistinctFn
.<MyObject>create(new MyObjectCoder())
.withSparseRepresentation(20)));
// Some advanced processing
PCollection<SomeObject> advancedResult = hll.apply(...);
PCollection<Long> cardinality = hll.apply(ApproximateDistinct.RetrieveCardinality.globally());
Consider using the HllCount.Init transform in the zetasketch extension module if
you need to create sketches compatible with Google Cloud BigQuery. For more details about using
HllCount and the zetasketch extension module, see
https://s.apache.org/hll-in-beam#bookmark=id.v6chsij1ixo7
Warning: this class is experimental. Its API is subject to change in future versions of
Beam. For example, it may be merged with the ApproximateUnique transform.
| Modifier and Type | Class and Description |
|---|---|
static class |
ApproximateDistinct.ApproximateDistinctFn<InputT>
Implements the
Combine.CombineFn of ApproximateDistinct transforms. |
static class |
ApproximateDistinct.GloballyDistinct<InputT>
Implementation of
globally(). |
static class |
ApproximateDistinct.HyperLogLogPlusCoder
Coder for
HyperLogLogPlus class. |
static class |
ApproximateDistinct.PerKeyDistinct<K,V>
Implementation of
perKey(). |
| Constructor and Description |
|---|
ApproximateDistinct() |
| Modifier and Type | Method and Description |
|---|---|
static <InputT> ApproximateDistinct.GloballyDistinct<InputT> |
globally()
Computes the approximate number of distinct elements in the input
PCollection<InputT>
and returns a PCollection<Long>. |
static <K,V> ApproximateDistinct.PerKeyDistinct<K,V> |
perKey()
Like
globally() but per key, i.e computes the approximate number of distinct values per
key in a PCollection<KV<K, V>> and returns PCollection<KV<K, Long>>. |
static long |
precisionForRelativeError(double relativeError)
Computes the precision based on the desired relative error.
|
static double |
relativeErrorForPrecision(int p) |
public static <InputT> ApproximateDistinct.GloballyDistinct<InputT> globally()
PCollection<InputT>
and returns a PCollection<Long>.InputT - the type of the elements in the input PCollectionpublic static <K,V> ApproximateDistinct.PerKeyDistinct<K,V> perKey()
globally() but per key, i.e computes the approximate number of distinct values per
key in a PCollection<KV<K, V>> and returns PCollection<KV<K, Long>>.K - type of the keys mapping the elementsV - type of the values being combined per keypublic static long precisionForRelativeError(double relativeError)
According to the paper, the mean squared error is bounded by the following formula:
b(m) / sqrt(m) Where m is the number of buckets used (p = log2(m)) andb(m) < 1.106form > 16 (and p > 4).
PCollection, the lower the variation will
be. {1,2,3,4,5,6} will get closer to 1/6.relativeError - the mean squared error should be in the interval ]0,1]public static double relativeErrorForPrecision(int p)
p - the precision i.e. the number of bits used for indexing the buckets