Source code for apache_beam.utils.timestamp

# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# See the License for the specific language governing permissions and
# limitations under the License.

"""Timestamp utilities.

For internal use only; no backwards-compatibility guarantees.

# pytype: skip-file
# mypy: disallow-untyped-defs

import datetime
import time
from typing import Union
from typing import overload

import dateutil.parser
import pytz
from google.protobuf import duration_pb2
from google.protobuf import timestamp_pb2

from apache_beam.portability import common_urns

# types compatible with Timestamp.of()
TimestampTypes = Union[int, float, 'Timestamp']
# types compatible with Duration.of()
DurationTypes = Union[int, float, 'Duration']
TimestampDurationTypes = Union[int, float, 'Duration', 'Timestamp']

[docs]class Timestamp(object): """Represents a Unix second timestamp with microsecond granularity. Can be treated in common timestamp arithmetic operations as a numeric type. Internally stores a time interval as an int of microseconds. This strategy is necessary since floating point values lose precision when storing values, especially after arithmetic operations (for example, 10000000 % 0.1 evaluates to 0.0999999994448885). """ def __init__( self, seconds: Union[int, float] = 0, micros: Union[int, float] = 0) -> None: if not isinstance(seconds, (int, float)): raise TypeError( 'Cannot interpret %s %s as seconds.' % (seconds, type(seconds))) if not isinstance(micros, (int, float)): raise TypeError( 'Cannot interpret %s %s as micros.' % (micros, type(micros))) self.micros = int(seconds * 1000000) + int(micros)
[docs] @staticmethod def of(seconds: TimestampTypes) -> 'Timestamp': """Return the Timestamp for the given number of seconds. If the input is already a Timestamp, the input itself will be returned. Args: seconds: Number of seconds as int, float, long, or Timestamp. Returns: Corresponding Timestamp object. """ if isinstance(seconds, Timestamp): return seconds elif isinstance(seconds, (int, float)): return Timestamp(seconds) elif isinstance(seconds, datetime.datetime): return Timestamp.from_utc_datetime(seconds) else: raise TypeError( 'Cannot interpret %s %s as Timestamp.' % (seconds, type(seconds)))
[docs] @staticmethod def now() -> 'Timestamp': return Timestamp(seconds=time.time())
@staticmethod def _epoch_datetime_utc() -> datetime.datetime: return datetime.datetime.fromtimestamp(0, pytz.utc)
[docs] @classmethod def from_utc_datetime(cls, dt: datetime.datetime) -> 'Timestamp': """Create a ``Timestamp`` instance from a ``datetime.datetime`` object. Args: dt: A ``datetime.datetime`` object in UTC (offset-aware). """ if dt.tzinfo is None: raise ValueError( "dt has no timezone info " + "(" + "#aware-and-naive-objects): %s" % dt) if dt.tzinfo != pytz.utc and dt.tzinfo != datetime.timezone.utc: raise ValueError('dt not in UTC: %s' % dt) duration = dt - cls._epoch_datetime_utc() return Timestamp(duration.total_seconds())
[docs] @classmethod def from_rfc3339(cls, rfc3339: str) -> 'Timestamp': """Create a ``Timestamp`` instance from an RFC 3339 compliant string. .. note:: All timezones are implicitly converted to UTC. Args: rfc3339: String in RFC 3339 form. """ try: dt = dateutil.parser.isoparse(rfc3339).astimezone(pytz.UTC) except ValueError as e: raise ValueError( "Could not parse RFC 3339 string '{}' due to error: '{}'.".format( rfc3339, e)) return cls.from_utc_datetime(dt)
[docs] def seconds(self) -> int: """Returns the timestamp in seconds.""" return self.micros // 1000000
[docs] def predecessor(self) -> 'Timestamp': """Returns the largest timestamp smaller than self.""" return Timestamp(micros=self.micros - 1)
[docs] def successor(self) -> 'Timestamp': """Returns the smallest timestamp larger than self.""" return Timestamp(micros=self.micros + 1)
def __repr__(self) -> str: micros = self.micros sign = '' if micros < 0: sign = '-' micros = -micros int_part = micros // 1000000 frac_part = micros % 1000000 if frac_part: return 'Timestamp(%s%d.%06d)' % (sign, int_part, frac_part) return 'Timestamp(%s%d)' % (sign, int_part)
[docs] def to_utc_datetime(self, has_tz: bool = False) -> datetime.datetime: """Returns a ``datetime.datetime`` object of UTC for this Timestamp. Note that this method returns a ``datetime.datetime`` object without a timezone info by default, as builtin `datetime.datetime.utcnow` method. If this is used as part of the processed data, one should set has_tz=True to avoid offset due to default timezone mismatch. Args: has_tz: whether the timezone info is attached, default to False. Returns: a ``datetime.datetime`` object of UTC for this Timestamp. """ # We can't easily construct a datetime object from microseconds, so we # create one at the epoch and add an appropriate timedelta interval. epoch = self._epoch_datetime_utc() if not has_tz: epoch = epoch.replace(tzinfo=None) return epoch + datetime.timedelta(microseconds=self.micros)
[docs] def to_rfc3339(self) -> str: # Append 'Z' for UTC timezone. return self.to_utc_datetime().isoformat() + 'Z'
[docs] def to_proto(self) -> timestamp_pb2.Timestamp: """Returns the `google.protobuf.timestamp_pb2` representation.""" secs = self.micros // 1000000 nanos = (self.micros % 1000000) * 1000 return timestamp_pb2.Timestamp(seconds=secs, nanos=nanos)
[docs] @staticmethod def from_proto(timestamp_proto: timestamp_pb2.Timestamp) -> 'Timestamp': """Creates a Timestamp from a `google.protobuf.timestamp_pb2`. Note that the google has a sub-second resolution of nanoseconds whereas this class has a resolution of microsends. This class will truncate the nanosecond resolution down to the microsecond. """ if timestamp_proto.nanos % 1000 != 0: # TODO( Better define # timestamps. raise ValueError( "Cannot convert from nanoseconds to microseconds " + "because this loses precision. Please make sure that " + "this is the correct behavior you want and manually " + "truncate the precision to the nearest microseconds. " + "See [] for " + "more information.") return Timestamp( seconds=timestamp_proto.seconds, micros=timestamp_proto.nanos // 1000)
def __float__(self) -> float: # Note that the returned value may have lost precision. return self.micros / 1000000 def __int__(self) -> int: # Note that the returned value may have lost precision. return self.micros // 1000000 def __eq__(self, other: object) -> bool: # Allow comparisons between Duration and Timestamp values. if isinstance(other, (Duration, Timestamp)): return self.micros == other.micros elif isinstance(other, (int, float)): return self.micros == Timestamp.of(other).micros else: # Support equality with other types return NotImplemented def __lt__(self, other: TimestampDurationTypes) -> bool: # Allow comparisons between Duration and Timestamp values. if not isinstance(other, Duration): other = Timestamp.of(other) return self.micros < other.micros def __gt__(self, other: TimestampDurationTypes) -> bool: return not (self < other or self == other) def __le__(self, other: TimestampDurationTypes) -> bool: return self < other or self == other def __ge__(self, other: TimestampDurationTypes) -> bool: return not self < other def __hash__(self) -> int: return hash(self.micros) def __add__(self, other: DurationTypes) -> 'Timestamp': other = Duration.of(other) return Timestamp(micros=self.micros + other.micros) def __radd__(self, other: DurationTypes) -> 'Timestamp': return self + other @overload def __sub__(self, other: DurationTypes) -> 'Timestamp': pass @overload def __sub__(self, other: 'Timestamp') -> 'Duration': pass def __sub__( self, other: Union[DurationTypes, 'Timestamp']) -> Union['Timestamp', 'Duration']: if isinstance(other, Timestamp): return Duration(micros=self.micros - other.micros) other = Duration.of(other) return Timestamp(micros=self.micros - other.micros) def __mod__(self, other: DurationTypes) -> 'Duration': other = Duration.of(other) return Duration(micros=self.micros % other.micros)
MIN_TIMESTAMP = Timestamp( micros=int(common_urns.constants.MIN_TIMESTAMP_MILLIS.constant) * 1000) MAX_TIMESTAMP = Timestamp( micros=int(common_urns.constants.MAX_TIMESTAMP_MILLIS.constant) * 1000)
[docs]class Duration(object): """Represents a second duration with microsecond granularity. Can be treated in common arithmetic operations as a numeric type. Internally stores a time interval as an int of microseconds. This strategy is necessary since floating point values lose precision when storing values, especially after arithmetic operations (for example, 10000000 % 0.1 evaluates to 0.0999999994448885). """ def __init__( self, seconds: Union[int, float] = 0, micros: Union[int, float] = 0) -> None: self.micros = int(seconds * 1000000) + int(micros)
[docs] @staticmethod def of(seconds: DurationTypes) -> 'Duration': """Return the Duration for the given number of seconds since Unix epoch. If the input is already a Duration, the input itself will be returned. Args: seconds: Number of seconds as int, float or Duration. Returns: Corresponding Duration object. """ if isinstance(seconds, Timestamp): raise TypeError('Cannot interpret %s as Duration.' % seconds) if isinstance(seconds, Duration): return seconds return Duration(seconds)
[docs] def to_proto(self) -> duration_pb2.Duration: """Returns the `google.protobuf.duration_pb2` representation.""" secs = self.micros // 1000000 nanos = (self.micros % 1000000) * 1000 return duration_pb2.Duration(seconds=secs, nanos=nanos)
[docs] @staticmethod def from_proto(duration_proto: duration_pb2.Duration) -> 'Duration': """Creates a Duration from a `google.protobuf.duration_pb2`. Note that the google has a sub-second resolution of nanoseconds whereas this class has a resolution of microsends. This class will truncate the nanosecond resolution down to the microsecond. """ if duration_proto.nanos % 1000 != 0: # TODO( Better define # durations. raise ValueError( "Cannot convert from nanoseconds to microseconds " + "because this loses precision. Please make sure that " + "this is the correct behavior you want and manually " + "truncate the precision to the nearest microseconds. " + "See [] for " + "more information.") return Duration( seconds=duration_proto.seconds, micros=duration_proto.nanos // 1000)
def __repr__(self) -> str: micros = self.micros sign = '' if micros < 0: sign = '-' micros = -micros int_part = micros // 1000000 frac_part = micros % 1000000 if frac_part: return 'Duration(%s%d.%06d)' % (sign, int_part, frac_part) return 'Duration(%s%d)' % (sign, int_part) def __float__(self) -> float: # Note that the returned value may have lost precision. return self.micros / 1000000 def __eq__(self, other: object) -> bool: # Allow comparisons between Duration and Timestamp values. if isinstance(other, (Duration, Timestamp)): return self.micros == other.micros elif isinstance(other, (int, float)): return self.micros == Duration.of(other).micros else: # Support equality with other types return NotImplemented def __lt__(self, other: TimestampDurationTypes) -> bool: # Allow comparisons between Duration and Timestamp values. if not isinstance(other, Timestamp): other = Duration.of(other) return self.micros < other.micros def __gt__(self, other: TimestampDurationTypes) -> bool: return not (self < other or self == other) def __le__(self, other: TimestampDurationTypes) -> bool: return self < other or self == other def __ge__(self, other: TimestampDurationTypes) -> bool: return not self < other def __hash__(self) -> int: return hash(self.micros) def __neg__(self) -> 'Duration': return Duration(micros=-self.micros) def __add__(self, other: DurationTypes) -> 'Duration': if isinstance(other, Timestamp): # defer to Timestamp.__add__ return NotImplemented other = Duration.of(other) return Duration(micros=self.micros + other.micros) def __radd__(self, other: DurationTypes) -> 'Duration': return self + other def __sub__(self, other: DurationTypes) -> 'Duration': other = Duration.of(other) return Duration(micros=self.micros - other.micros) def __rsub__(self, other: DurationTypes) -> 'Duration': return -(self - other) def __mul__(self, other: DurationTypes) -> 'Duration': other = Duration.of(other) return Duration(micros=self.micros * other.micros // 1000000) def __rmul__(self, other: DurationTypes) -> 'Duration': return self * other def __mod__(self, other: DurationTypes) -> 'Duration': other = Duration.of(other) return Duration(micros=self.micros % other.micros)
# The minimum granularity / interval expressible in a Timestamp / Duration # object. TIME_GRANULARITY = Duration(micros=1)