#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
"""BigTable connector
This module implements writing to BigTable tables.
The default mode is to set row data to write to BigTable tables.
The syntax supported is described here:
https://cloud.google.com/bigtable/docs/quickstart-cbt
BigTable connector can be used as main outputs. A main output
(common case) is expected to be massive and will be split into
manageable chunks and processed in parallel. In the example below
we created a list of rows then passed to the GeneratedDirectRows
DoFn to set the Cells and then we call the BigTableWriteFn to insert
those generated rows in the table.
main_table = (p
| beam.Create(self._generate())
| WriteToBigTable(project_id,
instance_id,
table_id))
"""
# pytype: skip-file
import logging
import apache_beam as beam
from apache_beam.internal.metrics.metric import ServiceCallMetric
from apache_beam.io.gcp import resource_identifiers
from apache_beam.metrics import Metrics
from apache_beam.metrics import monitoring_infos
from apache_beam.transforms.display import DisplayDataItem
_LOGGER = logging.getLogger(__name__)
try:
from google.cloud.bigtable import Client
from google.cloud.bigtable.batcher import MutationsBatcher
FLUSH_COUNT = 1000
MAX_ROW_BYTES = 5242880 # 5MB
class _MutationsBatcher(MutationsBatcher):
def __init__(
self, table, flush_count=FLUSH_COUNT, max_row_bytes=MAX_ROW_BYTES):
super().__init__(table, flush_count, max_row_bytes)
self.rows = []
def set_flush_callback(self, callback_fn):
self.callback_fn = callback_fn
def flush(self):
if len(self.rows) != 0:
status_list = self.table.mutate_rows(self.rows)
self.callback_fn(status_list)
# If even one request fails we retry everything. BigTable mutations are
# idempotent so this should be correct.
# TODO(https://github.com/apache/beam/issues/21396): make this more
# efficient by retrying only re-triable failed requests.
for status in status_list:
if not status:
# BigTable client may return 'None' instead of a valid status in
# some cases due to
# https://github.com/googleapis/python-bigtable/issues/485
raise Exception(
'Failed to write a batch of %r records' % len(self.rows))
elif status.code != 0:
raise Exception(
'Failed to write a batch of %r records due to %r' % (
len(self.rows),
ServiceCallMetric.bigtable_error_code_to_grpc_status_string(
status.code)))
self.total_mutation_count = 0
self.total_size = 0
self.rows = []
except ImportError:
_LOGGER.warning(
'ImportError: from google.cloud.bigtable import Client', exc_info=True)
__all__ = ['WriteToBigTable']
class _BigTableWriteFn(beam.DoFn):
""" Creates the connector can call and add_row to the batcher using each
row in beam pipe line
Args:
project_id(str): GCP Project ID
instance_id(str): GCP Instance ID
table_id(str): GCP Table ID
"""
def __init__(self, project_id, instance_id, table_id):
""" Constructor of the Write connector of Bigtable
Args:
project_id(str): GCP Project of to write the Rows
instance_id(str): GCP Instance to write the Rows
table_id(str): GCP Table to write the `DirectRows`
"""
super().__init__()
self.beam_options = {
'project_id': project_id,
'instance_id': instance_id,
'table_id': table_id
}
self.table = None
self.batcher = None
self.service_call_metric = None
self.written = Metrics.counter(self.__class__, 'Written Row')
def __getstate__(self):
return self.beam_options
def __setstate__(self, options):
self.beam_options = options
self.table = None
self.batcher = None
self.service_call_metric = None
self.written = Metrics.counter(self.__class__, 'Written Row')
def write_mutate_metrics(self, status_list):
for status in status_list:
code = status.code if status else None
grpc_status_string = (
ServiceCallMetric.bigtable_error_code_to_grpc_status_string(code))
self.service_call_metric.call(grpc_status_string)
def start_service_call_metrics(self, project_id, instance_id, table_id):
resource = resource_identifiers.BigtableTable(
project_id, instance_id, table_id)
labels = {
monitoring_infos.SERVICE_LABEL: 'BigTable',
# TODO(JIRA-11985): Add Ptransform label.
monitoring_infos.METHOD_LABEL: 'google.bigtable.v2.MutateRows',
monitoring_infos.RESOURCE_LABEL: resource,
monitoring_infos.BIGTABLE_PROJECT_ID_LABEL: (
self.beam_options['project_id']),
monitoring_infos.INSTANCE_ID_LABEL: self.beam_options['instance_id'],
monitoring_infos.TABLE_ID_LABEL: self.beam_options['table_id']
}
return ServiceCallMetric(
request_count_urn=monitoring_infos.API_REQUEST_COUNT_URN,
base_labels=labels)
def start_bundle(self):
if self.table is None:
client = Client(project=self.beam_options['project_id'])
instance = client.instance(self.beam_options['instance_id'])
self.table = instance.table(self.beam_options['table_id'])
self.service_call_metric = self.start_service_call_metrics(
self.beam_options['project_id'],
self.beam_options['instance_id'],
self.beam_options['table_id'])
self.batcher = _MutationsBatcher(self.table)
self.batcher.set_flush_callback(self.write_mutate_metrics)
def process(self, row):
self.written.inc()
# You need to set the timestamp in the cells in this row object,
# when we do a retry we will mutating the same object, but, with this
# we are going to set our cell with new values.
# Example:
# direct_row.set_cell('cf1',
# 'field1',
# 'value1',
# timestamp=datetime.datetime.now())
self.batcher.mutate(row)
def finish_bundle(self):
self.batcher.flush()
self.batcher = None
def display_data(self):
return {
'projectId': DisplayDataItem(
self.beam_options['project_id'], label='Bigtable Project Id'),
'instanceId': DisplayDataItem(
self.beam_options['instance_id'], label='Bigtable Instance Id'),
'tableId': DisplayDataItem(
self.beam_options['table_id'], label='Bigtable Table Id')
}
[docs]class WriteToBigTable(beam.PTransform):
""" A transform to write to the Bigtable Table.
A PTransform that write a list of `DirectRow` into the Bigtable Table
"""
def __init__(self, project_id=None, instance_id=None, table_id=None):
""" The PTransform to access the Bigtable Write connector
Args:
project_id(str): GCP Project of to write the Rows
instance_id(str): GCP Instance to write the Rows
table_id(str): GCP Table to write the `DirectRows`
"""
super().__init__()
self.beam_options = {
'project_id': project_id,
'instance_id': instance_id,
'table_id': table_id
}
[docs] def expand(self, pvalue):
beam_options = self.beam_options
return (
pvalue
| beam.ParDo(
_BigTableWriteFn(
beam_options['project_id'],
beam_options['instance_id'],
beam_options['table_id'])))