apache_beam.io.gcp package

Submodules

apache_beam.io.gcp.bigquery module

BigQuery sources and sinks.

This module implements reading from and writing to BigQuery tables. It relies on several classes exposed by the BigQuery API: TableSchema, TableFieldSchema, TableRow, and TableCell. The default mode is to return table rows read from a BigQuery source as dictionaries. Similarly a Write transform to a BigQuerySink accepts PCollections of dictionaries. This is done for more convenient programming. If desired, the native TableRow objects can be used throughout to represent rows (use an instance of TableRowJsonCoder as a coder argument when creating the sources or sinks respectively).

Also, for programming convenience, instances of TableReference and TableSchema have a string representation that can be used for the corresponding arguments:

  • TableReference can be a PROJECT:DATASET.TABLE or DATASET.TABLE string.
  • TableSchema can be a NAME:TYPE{,NAME:TYPE}* string (e.g. ‘month:STRING,event_count:INTEGER’).

The syntax supported is described here: https://cloud.google.com/bigquery/bq-command-line-tool-quickstart

BigQuery sources can be used as main inputs or side inputs. A main input (common case) is expected to be massive and will be split into manageable chunks and processed in parallel. Side inputs are expected to be small and will be read completely every time a ParDo DoFn gets executed. In the example below the lambda function implementing the DoFn for the Map transform will get on each call one row of the main table and all rows of the side table. The runner may use some caching techniques to share the side inputs between calls in order to avoid excessive reading::

main_table = pipeline | 'VeryBig' >> beam.io.Read(beam.io.BigQuerySource()
side_table = pipeline | 'NotBig' >> beam.io.Read(beam.io.BigQuerySource()
results = (
    main_table
    | 'ProcessData' >> beam.Map(
        lambda element, side_input: ..., AsList(side_table)))

There is no difference in how main and side inputs are read. What makes the side_table a ‘side input’ is the AsList wrapper used when passing the table as a parameter to the Map transform. AsList signals to the execution framework that its input should be made available whole.

The main and side inputs are implemented differently. Reading a BigQuery table as main input entails exporting the table to a set of GCS files (currently in JSON format) and then processing those files. Reading the same table as a side input entails querying the table for all its rows. The coder argument on BigQuerySource controls the reading of the lines in the export files (i.e., transform a JSON object into a PCollection element). The coder is not involved when the same table is read as a side input since there is no intermediate format involved. We get the table rows directly from the BigQuery service with a query.

Users may provide a query to read from rather than reading all of a BigQuery table. If specified, the result obtained by executing the specified query will be used as the data of the input transform.:

query_results = pipeline | beam.io.Read(beam.io.BigQuerySource(
    query='SELECT year, mean_temp FROM samples.weather_stations'))

When creating a BigQuery input transform, users should provide either a query or a table. Pipeline construction will fail with a validation error if neither or both are specified.

* Short introduction to BigQuery concepts * Tables have rows (TableRow) and each row has cells (TableCell). A table has a schema (TableSchema), which in turn describes the schema of each cell (TableFieldSchema). The terms field and cell are used interchangeably.

TableSchema: Describes the schema (types and order) for values in each row.
Has one attribute, ‘field’, which is list of TableFieldSchema objects.
TableFieldSchema: Describes the schema (type, name) for one field.
Has several attributes, including ‘name’ and ‘type’. Common values for the type attribute are: ‘STRING’, ‘INTEGER’, ‘FLOAT’, ‘BOOLEAN’. All possible values are described at: https://cloud.google.com/bigquery/preparing-data-for-bigquery#datatypes
TableRow: Holds all values in a table row. Has one attribute, ‘f’, which is a
list of TableCell instances.
TableCell: Holds the value for one cell (or field). Has one attribute,
‘v’, which is a JsonValue instance. This class is defined in apitools.base.py.extra_types.py module.
class apache_beam.io.gcp.bigquery.TableRowJsonCoder(table_schema=None)[source]

Bases: apache_beam.coders.coders.Coder

A coder for a TableRow instance to/from a JSON string.

Note that the encoding operation (used when writing to sinks) requires the table schema in order to obtain the ordered list of field names. Reading from sources on the other hand does not need the table schema.

decode(encoded_table_row)[source]
encode(table_row)[source]
class apache_beam.io.gcp.bigquery.BigQueryDisposition[source]

Bases: object

Class holding standard strings used for create and write dispositions.

CREATE_IF_NEEDED = 'CREATE_IF_NEEDED'
CREATE_NEVER = 'CREATE_NEVER'
WRITE_APPEND = 'WRITE_APPEND'
WRITE_EMPTY = 'WRITE_EMPTY'
WRITE_TRUNCATE = 'WRITE_TRUNCATE'
static validate_create(disposition)[source]
static validate_write(disposition)[source]
class apache_beam.io.gcp.bigquery.BigQuerySource(table=None, dataset=None, project=None, query=None, validate=False, coder=None, use_standard_sql=False, flatten_results=True)[source]

Bases: apache_beam.runners.dataflow.native_io.iobase.NativeSource

A source based on a BigQuery table.

display_data()[source]
format

Source format name required for remote execution.

reader(test_bigquery_client=None)[source]
class apache_beam.io.gcp.bigquery.BigQuerySink(table, dataset=None, project=None, schema=None, create_disposition='CREATE_IF_NEEDED', write_disposition='WRITE_EMPTY', validate=False, coder=None)[source]

Bases: apache_beam.runners.dataflow.native_io.iobase.NativeSink

A sink based on a BigQuery table.

display_data()[source]
format

Sink format name required for remote execution.

schema_as_json()[source]

Returns the TableSchema associated with the sink as a JSON string.

writer(test_bigquery_client=None, buffer_size=None)[source]
class apache_beam.io.gcp.bigquery.WriteToBigQuery(table, dataset=None, project=None, schema=None, create_disposition='CREATE_IF_NEEDED', write_disposition='WRITE_APPEND', batch_size=None, test_client=None)[source]

Bases: apache_beam.transforms.ptransform.PTransform

display_data()[source]
expand(pcoll)[source]
static get_dict_table_schema(schema)[source]

Transform the table schema into a dictionary instance.

Parameters:schema – The schema to be used if the BigQuery table to write has to be created. This can either be a dict or string or in the TableSchema format.
Returns:
The schema to be used if the BigQuery table to write has
to be created but in the dictionary format.
Return type:table_schema
static get_table_schema_from_string(schema)[source]

Transform the string table schema into a bigquery.TableSchema instance.

Parameters:schema – The sting schema to be used if the BigQuery table to write has to be created.
Returns:
The schema to be used if the BigQuery table to write has
to be created but in the bigquery.TableSchema format.
Return type:table_schema
static table_schema_to_dict(table_schema)[source]

Create a dictionary representation of table schema for serialization

apache_beam.io.gcp.gcsfilesystem module

GCS file system implementation for accessing files on GCS.

class apache_beam.io.gcp.gcsfilesystem.GCSFileSystem[source]

Bases: apache_beam.io.filesystem.FileSystem

A GCS FileSystem implementation for accessing files on GCS.

CHUNK_SIZE = 100
GCS_PREFIX = 'gs://'
copy(source_file_names, destination_file_names)[source]

Recursively copy the file tree from the source to the destination

Parameters:
  • source_file_names – list of source file objects that needs to be copied
  • destination_file_names – list of destination of the new object
Raises:

BeamIOError if any of the copy operations fail

create(path, mime_type='application/octet-stream', compression_type='auto')[source]

Returns a write channel for the given file path.

Parameters:
  • path – string path of the file object to be written to the system
  • mime_type – MIME type to specify the type of content in the file object
  • compression_type – Type of compression to be used for this object

Returns: file handle with a close function for the user to use

delete(paths)[source]

Deletes files or directories at the provided paths. Directories will be deleted recursively.

Parameters:paths – list of paths that give the file objects to be deleted
exists(path)[source]

Check if the provided path exists on the FileSystem.

Parameters:path – string path that needs to be checked.

Returns: boolean flag indicating if path exists

join(basepath, *paths)[source]

Join two or more pathname components for the filesystem

Parameters:
  • basepath – string path of the first component of the path
  • paths – path components to be added

Returns: full path after combining all the passed components

match(patterns, limits=None)[source]

Find all matching paths to the pattern provided.

Parameters:
  • pattern – string for the file path pattern to match against
  • limit – Maximum number of responses that need to be fetched

Returns: list of MatchResult objects.

Raises:BeamIOError if any of the pattern match operations fail
mkdirs(path)[source]

Recursively create directories for the provided path.

Parameters:path – string path of the directory structure that should be created
Raises:IOError if leaf directory already exists.
open(path, mime_type='application/octet-stream', compression_type='auto')[source]

Returns a read channel for the given file path.

Parameters:
  • path – string path of the file object to be written to the system
  • mime_type – MIME type to specify the type of content in the file object
  • compression_type – Type of compression to be used for this object

Returns: file handle with a close function for the user to use

rename(source_file_names, destination_file_names)[source]

Rename the files at the source list to the destination list. Source and destination lists should be of the same size.

Parameters:
  • source_file_names – List of file paths that need to be moved
  • destination_file_names – List of destination_file_names for the files
Raises:

BeamIOError if any of the rename operations fail

classmethod scheme()[source]

URI scheme for the FileSystem

split(path)[source]

Splits the given path into two parts.

Splits the path into a pair (head, tail) such that tail contains the last component of the path and head contains everything up to that.

Head will include the GCS prefix (‘gs://’).

Parameters:path – path as a string
Returns:a pair of path components as strings.

apache_beam.io.gcp.gcsio module

Google Cloud Storage client.

This library evolved from the Google App Engine GCS client available at https://github.com/GoogleCloudPlatform/appengine-gcs-client.

class apache_beam.io.gcp.gcsio.GcsIO(storage_client=None)[source]

Bases: object

Google Cloud Storage I/O client.

copy(*args, **kwargs)
copy_batch(src_dest_pairs)[source]

Copies the given GCS object from src to dest.

Parameters:src_dest_pairs – list of (src, dest) tuples of gs://<bucket>/<name> files paths to copy from src to dest, not to exceed MAX_BATCH_OPERATION_SIZE in length.
Returns: List of tuples of (src, dest, exception) in the same order as the
src_dest_pairs argument, where exception is None if the operation succeeded or the relevant exception if the operation failed.
copytree(src, dest)[source]

Renames the given GCS “directory” recursively from src to dest.

Parameters:
  • src – GCS file path pattern in the form gs://<bucket>/<name>/.
  • dest – GCS file path pattern in the form gs://<bucket>/<name>/.
delete(*args, **kwargs)
delete_batch(paths)[source]

Deletes the objects at the given GCS paths.

Parameters:paths – List of GCS file path patterns in the form gs://<bucket>/<name>, not to exceed MAX_BATCH_OPERATION_SIZE in length.
Returns: List of tuples of (path, exception) in the same order as the paths
argument, where exception is None if the operation succeeded or the relevant exception if the operation failed.
exists(*args, **kwargs)
glob(*args, **kwargs)
open(filename, mode='r', read_buffer_size=16777216, mime_type='application/octet-stream')[source]

Open a GCS file path for reading or writing.

Parameters:
  • filename – GCS file path in the form gs://<bucket>/<object>.
  • mode – ‘r’ for reading or ‘w’ for writing.
  • read_buffer_size – Buffer size to use during read operations.
  • mime_type – Mime type to set for write operations.
Returns:

file object.

Raises:

ValueError – Invalid open file mode.

rename(src, dest)[source]

Renames the given GCS object from src to dest.

Parameters:
  • src – GCS file path pattern in the form gs://<bucket>/<name>.
  • dest – GCS file path pattern in the form gs://<bucket>/<name>.
size(*args, **kwargs)
size_of_files_in_glob(*args, **kwargs)

apache_beam.io.gcp.pubsub module

Google Cloud PubSub sources and sinks.

Cloud Pub/Sub sources and sinks are currently supported only in streaming pipelines, during remote execution.

This API is currently under development and is subject to change.

class apache_beam.io.gcp.pubsub.ReadStringsFromPubSub(topic=None, subscription=None, id_label=None)[source]

Bases: apache_beam.transforms.ptransform.PTransform

A PTransform for reading utf-8 string payloads from Cloud Pub/Sub.

expand(pvalue)[source]
get_windowing(unused_inputs)[source]
class apache_beam.io.gcp.pubsub.WriteStringsToPubSub(topic)[source]

Bases: apache_beam.transforms.ptransform.PTransform

A PTransform for writing utf-8 string payloads to Cloud Pub/Sub.

expand(pcoll)[source]

Module contents