Source code for apache_beam.transforms.cy_combiners
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
"""A library of basic cythonized CombineFn subclasses.
For internal use only; no backwards-compatibility guarantees.
"""
from __future__ import absolute_import
from apache_beam.transforms import core
[docs]class AccumulatorCombineFn(core.CombineFn):
# singleton?
[docs] def create_accumulator(self):
return self._accumulator_type()
@staticmethod
[docs] def merge_accumulators(self, accumulators):
accumulator = self._accumulator_type()
accumulator.merge(accumulators)
return accumulator
@staticmethod
def __eq__(self, other):
return (isinstance(other, AccumulatorCombineFn)
and self._accumulator_type is other._accumulator_type)
def __hash__(self):
return hash(self._accumulator_type)
_63 = 63 # Avoid large literals in C source code.
globals()['INT64_MAX'] = 2**_63 - 1
globals()['INT64_MIN'] = -2**_63
[docs]class CountAccumulator(object):
def __init__(self):
self.value = 0
[docs] def merge(self, accumulators):
for accumulator in accumulators:
self.value += accumulator.value
[docs]class SumInt64Accumulator(object):
def __init__(self):
self.value = 0
[docs] def merge(self, accumulators):
for accumulator in accumulators:
self.value += accumulator.value
[docs]class MinInt64Accumulator(object):
def __init__(self):
self.value = INT64_MAX
[docs] def merge(self, accumulators):
for accumulator in accumulators:
if accumulator.value < self.value:
self.value = accumulator.value
[docs]class MaxInt64Accumulator(object):
def __init__(self):
self.value = INT64_MIN
[docs] def merge(self, accumulators):
for accumulator in accumulators:
if accumulator.value > self.value:
self.value = accumulator.value
[docs]class MeanInt64Accumulator(object):
def __init__(self):
self.sum = 0
self.count = 0
[docs] def merge(self, accumulators):
for accumulator in accumulators:
self.sum += accumulator.sum
self.count += accumulator.count
[docs]class CountCombineFn(AccumulatorCombineFn):
_accumulator_type = CountAccumulator
[docs]class SumInt64Fn(AccumulatorCombineFn):
_accumulator_type = SumInt64Accumulator
[docs]class MinInt64Fn(AccumulatorCombineFn):
_accumulator_type = MinInt64Accumulator
[docs]class MaxInt64Fn(AccumulatorCombineFn):
_accumulator_type = MaxInt64Accumulator
[docs]class MeanInt64Fn(AccumulatorCombineFn):
_accumulator_type = MeanInt64Accumulator
_POS_INF = float('inf')
_NEG_INF = float('-inf')
_NAN = float('nan')
[docs]class SumDoubleAccumulator(object):
def __init__(self):
self.value = 0
[docs] def merge(self, accumulators):
for accumulator in accumulators:
self.value += accumulator.value
[docs]class MinDoubleAccumulator(object):
def __init__(self):
self.value = _POS_INF
[docs] def merge(self, accumulators):
for accumulator in accumulators:
if accumulator.value < self.value:
self.value = accumulator.value
[docs]class MaxDoubleAccumulator(object):
def __init__(self):
self.value = _NEG_INF
[docs] def merge(self, accumulators):
for accumulator in accumulators:
if accumulator.value > self.value:
self.value = accumulator.value
[docs]class MeanDoubleAccumulator(object):
def __init__(self):
self.sum = 0
self.count = 0
[docs] def merge(self, accumulators):
for accumulator in accumulators:
self.sum += accumulator.sum
self.count += accumulator.count
[docs] def extract_output(self):
return self.sum / self.count if self.count else _NAN
[docs]class SumFloatFn(AccumulatorCombineFn):
_accumulator_type = SumDoubleAccumulator
[docs]class MinFloatFn(AccumulatorCombineFn):
_accumulator_type = MinDoubleAccumulator
[docs]class MaxFloatFn(AccumulatorCombineFn):
_accumulator_type = MaxDoubleAccumulator
[docs]class MeanFloatFn(AccumulatorCombineFn):
_accumulator_type = MeanDoubleAccumulator
[docs]class AllAccumulator(object):
def __init__(self):
self.value = True
[docs] def merge(self, accumulators):
for accumulator in accumulators:
self.value &= accumulator.value
[docs]class AnyAccumulator(object):
def __init__(self):
self.value = False
[docs] def merge(self, accumulators):
for accumulator in accumulators:
self.value |= accumulator.value
[docs]class AnyCombineFn(AccumulatorCombineFn):
_accumulator_type = AnyAccumulator
[docs]class AllCombineFn(AccumulatorCombineFn):
_accumulator_type = AllAccumulator