Source code for apache_beam.coders.slow_stream
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
"""A pure Python implementation of stream.pyx.
For internal use only; no backwards-compatibility guarantees.
"""
import struct
[docs]class OutputStream(object):
  """For internal use only; no backwards-compatibility guarantees.
  A pure Python implementation of stream.OutputStream."""
  def __init__(self):
    self.data = []
[docs]  def write(self, b, nested=False):
    assert isinstance(b, str)
    if nested:
      self.write_var_int64(len(b))
    self.data.append(b) 
[docs]  def write_byte(self, val):
    self.data.append(chr(val)) 
[docs]  def write_var_int64(self, v):
    if v < 0:
      v += 1 << 64
      if v <= 0:
        raise ValueError('Value too large (negative).')
    while True:
      bits = v & 0x7F
      v >>= 7
      if v:
        bits |= 0x80
      self.write_byte(bits)
      if not v:
        break 
[docs]  def write_bigendian_int64(self, v):
    self.write(struct.pack('>q', v)) 
[docs]  def write_bigendian_uint64(self, v):
    self.write(struct.pack('>Q', v)) 
[docs]  def write_bigendian_int32(self, v):
    self.write(struct.pack('>i', v)) 
[docs]  def write_bigendian_double(self, v):
    self.write(struct.pack('>d', v)) 
[docs]  def get(self):
    return ''.join(self.data) 
[docs]  def size(self):
    return len(self.data)  
[docs]class ByteCountingOutputStream(OutputStream):
  """For internal use only; no backwards-compatibility guarantees.
  A pure Python implementation of stream.ByteCountingOutputStream."""
  def __init__(self):
    # Note that we don't actually use any of the data initialized by our super.
    super(ByteCountingOutputStream, self).__init__()
    self.count = 0
[docs]  def write(self, byte_array, nested=False):
    blen = len(byte_array)
    if nested:
      self.write_var_int64(blen)
    self.count += blen 
[docs]  def write_byte(self, _):
    self.count += 1 
[docs]  def get_count(self):
    return self.count 
[docs]  def get(self):
    raise NotImplementedError 
  def __str__(self):
    return '<%s %s>' % (self.__class__.__name__, self.count) 
[docs]def get_varint_size(v):
  """For internal use only; no backwards-compatibility guarantees.
  Returns the size of the given integer value when encode as a VarInt."""
  if v < 0:
    v += 1 << 64
    if v <= 0:
      raise ValueError('Value too large (negative).')
  varint_size = 0
  while True:
    varint_size += 1
    v >>= 7
    if not v:
      break
  return varint_size