Source code for apache_beam.io.gcp.bigquery_read_internal

#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

"""
Internal library for reading data from BigQuery.

NOTHING IN THIS FILE HAS BACKWARDS COMPATIBILITY GUARANTEES.
"""
import collections
import decimal
import json
import logging
import secrets
import time
import uuid
from typing import TYPE_CHECKING
from typing import Any
from typing import Dict
from typing import Iterable
from typing import List
from typing import Optional
from typing import Union

import apache_beam as beam
from apache_beam.coders import coders
from apache_beam.io.avroio import _create_avro_source
from apache_beam.io.filesystem import CompressionTypes
from apache_beam.io.filesystems import FileSystems
from apache_beam.io.gcp import bigquery_tools
from apache_beam.io.gcp.bigquery_io_metadata import create_bigquery_io_metadata
from apache_beam.io.iobase import BoundedSource
from apache_beam.io.textio import _TextSource
from apache_beam.metrics.metric import Lineage
from apache_beam.options.pipeline_options import GoogleCloudOptions
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.value_provider import ValueProvider
from apache_beam.transforms import PTransform

if TYPE_CHECKING:
  from apache_beam.io.gcp.bigquery import ReadFromBigQueryRequest

try:
  from apache_beam.io.gcp.internal.clients.bigquery import DatasetReference
  from apache_beam.io.gcp.internal.clients.bigquery import TableReference
except ImportError:
  DatasetReference = None
  TableReference = None

_LOGGER = logging.getLogger(__name__)


[docs] def bigquery_export_destination_uri( gcs_location_vp: Union[str, Optional[ValueProvider]], temp_location: Optional[str], unique_id: str, directory_only: bool = False, ) -> str: """Returns the fully qualified Google Cloud Storage URI where the extracted table should be written. """ file_pattern = 'bigquery-table-dump-*.json' gcs_location = None if gcs_location_vp is not None: if isinstance(gcs_location_vp, ValueProvider): gcs_location = gcs_location_vp.get() else: gcs_location = gcs_location_vp if gcs_location is not None: gcs_base = gcs_location elif temp_location is not None: gcs_base = temp_location _LOGGER.debug("gcs_location is empty, using temp_location instead") else: raise ValueError( 'ReadFromBigQuery requires a GCS location to be provided. Neither ' 'gcs_location in the constructor nor the fallback option ' '--temp_location is set.') if not unique_id: unique_id = uuid.uuid4().hex if directory_only: return FileSystems.join(gcs_base, unique_id) else: return FileSystems.join(gcs_base, unique_id, file_pattern)
class _PassThroughThenCleanup(PTransform): """A PTransform that invokes a DoFn after the input PCollection has been processed. DoFn should have arguments (element, side_input, cleanup_signal). Utilizes readiness of PCollection to trigger DoFn. """ def __init__(self, side_input=None): self.side_input = side_input def expand(self, input): class PassThrough(beam.DoFn): def process(self, element): yield element class RemoveExtractedFiles(beam.DoFn): def process(self, unused_element, unused_signal, gcs_locations): FileSystems.delete(list(gcs_locations)) main_output, cleanup_signal = input | beam.ParDo( PassThrough()).with_outputs( 'cleanup_signal', main='main') cleanup_input = input.pipeline | beam.Create([None]) _ = cleanup_input | beam.ParDo( RemoveExtractedFiles(), beam.pvalue.AsSingleton(cleanup_signal), self.side_input, ) return main_output class _PassThroughThenCleanupTempDatasets(PTransform): """A PTransform that invokes a DoFn after the input PCollection has been processed. DoFn should have arguments (element, side_input, cleanup_signal). Utilizes readiness of PCollection to trigger DoFn. """ def __init__(self, side_input=None): self.side_input = side_input def expand(self, input): pipeline_options = input.pipeline.options class PassThrough(beam.DoFn): def process(self, element): yield element class CleanUpProjects(beam.DoFn): def process(self, unused_element, unused_signal, pipeline_details): bq = bigquery_tools.BigQueryWrapper.from_pipeline_options( pipeline_options) pipeline_details = pipeline_details[0] if 'temp_table_ref' in pipeline_details.keys(): temp_table_ref = pipeline_details['temp_table_ref'] bq._clean_up_beam_labelled_temporary_datasets( project_id=temp_table_ref.projectId, dataset_id=temp_table_ref.datasetId, table_id=temp_table_ref.tableId) elif 'project_id' in pipeline_details.keys(): bq._clean_up_beam_labelled_temporary_datasets( project_id=pipeline_details['project_id'], labels=pipeline_details['bigquery_dataset_labels']) main_output, cleanup_signal = input | beam.ParDo( PassThrough()).with_outputs( 'cleanup_signal', main='main') cleanup_input = input.pipeline | beam.Create([None]) _ = cleanup_input | beam.ParDo( CleanUpProjects(), beam.pvalue.AsSingleton(cleanup_signal), self.side_input, ) return main_output class _BigQueryReadSplit(beam.transforms.DoFn): """Starts the process of reading from BigQuery. This transform will start a BigQuery export job, and output a number of file sources that are consumed downstream. """ def __init__( self, options: PipelineOptions, gcs_location: Union[str, ValueProvider] = None, use_json_exports: bool = False, bigquery_job_labels: Dict[str, str] = None, step_name: str = None, job_name: str = None, unique_id: str = None, kms_key: str = None, project: str = None, temp_dataset: Union[str, DatasetReference] = None, query_priority: Optional[str] = None): self.options = options self.use_json_exports = use_json_exports self.gcs_location = gcs_location self.bigquery_job_labels = bigquery_job_labels or {} self._step_name = step_name self._job_name = job_name or 'BQ_READ_SPLIT' self._source_uuid = unique_id self.kms_key = kms_key self.project = project self.temp_dataset = temp_dataset self.query_priority = query_priority self.bq_io_metadata = None def display_data(self): return { 'use_json_exports': str(self.use_json_exports), 'gcs_location': str(self.gcs_location), 'bigquery_job_labels': json.dumps(self.bigquery_job_labels), 'kms_key': str(self.kms_key), 'project': str(self.project), 'temp_dataset': str(self.temp_dataset) } def _get_temp_dataset_id(self): if self.temp_dataset is None: return None elif isinstance(self.temp_dataset, DatasetReference): return self.temp_dataset.datasetId elif isinstance(self.temp_dataset, str): return self.temp_dataset else: raise ValueError("temp_dataset has to be either str or DatasetReference") def start_bundle(self): self.bq = bigquery_tools.BigQueryWrapper( temp_dataset_id=self._get_temp_dataset_id(), client=bigquery_tools.BigQueryWrapper._bigquery_client(self.options)) def process(self, element: 'ReadFromBigQueryRequest') -> Iterable[BoundedSource]: if element.query is not None: if not self.bq.created_temp_dataset: self._setup_temporary_dataset(self.bq, element) table_reference = self._execute_query(self.bq, element) else: assert element.table table_reference = bigquery_tools.parse_table_reference( element.table, project=self._get_project()) if not table_reference.projectId: table_reference.projectId = self._get_project() schema, metadata_list = self._export_files( self.bq, element, table_reference) for metadata in metadata_list: yield self._create_source(metadata.path, schema) Lineage.sources().add( 'bigquery', table_reference.projectId, table_reference.datasetId, table_reference.tableId) if element.query is not None: self.bq._delete_table( table_reference.projectId, table_reference.datasetId, table_reference.tableId) def finish_bundle(self): if self.bq.created_temp_dataset: self.bq.clean_up_temporary_dataset(self._get_project()) def _get_bq_metadata(self): if not self.bq_io_metadata: self.bq_io_metadata = create_bigquery_io_metadata(self._step_name) return self.bq_io_metadata def _create_source(self, path, schema): if not self.use_json_exports: return _create_avro_source(path) else: return _TextSource( path, min_bundle_size=0, compression_type=CompressionTypes.UNCOMPRESSED, strip_trailing_newlines=True, coder=_JsonToDictCoder(schema)) def _setup_temporary_dataset( self, bq: bigquery_tools.BigQueryWrapper, element: 'ReadFromBigQueryRequest'): location = bq.get_query_location( self._get_project(), element.query, not element.use_standard_sql) bq.create_temporary_dataset(self._get_project(), location) def _execute_query( self, bq: bigquery_tools.BigQueryWrapper, element: 'ReadFromBigQueryRequest'): query_job_name = bigquery_tools.generate_bq_job_name( self._job_name, self._source_uuid, bigquery_tools.BigQueryJobTypes.QUERY, '%s_%s' % (int(time.time()), secrets.token_hex(3))) job = bq._start_query_job( self._get_project(), element.query, not element.use_standard_sql, element.flatten_results, job_id=query_job_name, priority=self.query_priority, kms_key=self.kms_key, job_labels=self._get_bq_metadata().add_additional_bq_job_labels( self.bigquery_job_labels)) job_ref = job.jobReference bq.wait_for_bq_job(job_ref, max_retries=0) return bq._get_temp_table(self._get_project()) def _export_files( self, bq: bigquery_tools.BigQueryWrapper, element: 'ReadFromBigQueryRequest', table_reference: TableReference): """Runs a BigQuery export job. Returns: bigquery.TableSchema instance, a list of FileMetadata instances """ job_labels = self._get_bq_metadata().add_additional_bq_job_labels( self.bigquery_job_labels) export_job_name = bigquery_tools.generate_bq_job_name( self._job_name, self._source_uuid, bigquery_tools.BigQueryJobTypes.EXPORT, element.obj_id) temp_location = self.options.view_as(GoogleCloudOptions).temp_location gcs_location = bigquery_export_destination_uri( self.gcs_location, temp_location, '%s%s' % (self._source_uuid, element.obj_id)) try: if self.use_json_exports: job_ref = bq.perform_extract_job([gcs_location], export_job_name, table_reference, bigquery_tools.FileFormat.JSON, project=self._get_project(), job_labels=job_labels, include_header=False) else: job_ref = bq.perform_extract_job([gcs_location], export_job_name, table_reference, bigquery_tools.FileFormat.AVRO, project=self._get_project(), include_header=False, job_labels=job_labels, use_avro_logical_types=True) bq.wait_for_bq_job(job_ref) except Exception as exn: # pylint: disable=broad-except # The error messages thrown in this case are generic and misleading, # so leave this breadcrumb in case it's the root cause. logging.warning( "Error exporting table: %s. " "Note that external tables cannot be exported: " "https://cloud.google.com/bigquery/docs/external-tables" "#external_table_limitations", exn) raise metadata_list = FileSystems.match([gcs_location])[0].metadata_list if isinstance(table_reference, ValueProvider): table_ref = bigquery_tools.parse_table_reference( element.table, project=self._get_project()) else: table_ref = table_reference table = bq.get_table( table_ref.projectId, table_ref.datasetId, table_ref.tableId) return table.schema, metadata_list def _get_project(self): """Returns the project that queries and exports will be billed to.""" project = self.options.view_as(GoogleCloudOptions).project if isinstance(project, ValueProvider): project = project.get() if not project: project = self.project return project FieldSchema = collections.namedtuple('FieldSchema', 'fields mode name type') class _JsonToDictCoder(coders.Coder): """A coder for a JSON string to a Python dict.""" def __init__(self, table_schema): self.fields = self._convert_to_tuple(table_schema.fields) self._converters = { 'INTEGER': int, 'INT64': int, 'FLOAT': float, 'FLOAT64': float, 'NUMERIC': self._to_decimal, 'BYTES': self._to_bytes, } @staticmethod def _to_decimal(value): return decimal.Decimal(value) @staticmethod def _to_bytes(value): """Converts value from str to bytes.""" return value.encode('utf-8') @classmethod def _convert_to_tuple(cls, table_field_schemas): """Recursively converts the list of TableFieldSchema instances to the list of tuples to prevent errors when pickling and unpickling TableFieldSchema instances. """ if not table_field_schemas: return [] return [ FieldSchema(cls._convert_to_tuple(x.fields), x.mode, x.name, x.type) for x in table_field_schemas ] def decode(self, value): value = json.loads(value.decode('utf-8')) return self._decode_row(value, self.fields) def _decode_row(self, row: Dict[str, Any], schema_fields: List[FieldSchema]): for field in schema_fields: if field.name not in row: # The field exists in the schema, but it doesn't exist in this row. # It probably means its value was null, as the extract to JSON job # doesn't preserve null fields row[field.name] = None continue if field.mode == 'REPEATED': for i, elem in enumerate(row[field.name]): row[field.name][i] = self._decode_data(elem, field) else: row[field.name] = self._decode_data(row[field.name], field) return row def _decode_data(self, obj: Any, field: FieldSchema): if not field.fields: try: return self._converters[field.type](obj) except KeyError: # No need to do any conversion return obj return self._decode_row(obj, field.fields) def is_deterministic(self): return True def to_type_hint(self): return dict