apache_beam.io.gcp package¶
Subpackages¶
Submodules¶
apache_beam.io.gcp.bigquery module¶
BigQuery sources and sinks.
This module implements reading from and writing to BigQuery tables. It relies on several classes exposed by the BigQuery API: TableSchema, TableFieldSchema, TableRow, and TableCell. The default mode is to return table rows read from a BigQuery source as dictionaries. Similarly a Write transform to a BigQuerySink accepts PCollections of dictionaries. This is done for more convenient programming. If desired, the native TableRow objects can be used throughout to represent rows (use an instance of TableRowJsonCoder as a coder argument when creating the sources or sinks respectively).
Also, for programming convenience, instances of TableReference and TableSchema have a string representation that can be used for the corresponding arguments:
- TableReference can be a PROJECT:DATASET.TABLE or DATASET.TABLE string.
- TableSchema can be a NAME:TYPE{,NAME:TYPE}* string (e.g. ‘month:STRING,event_count:INTEGER’).
The syntax supported is described here: https://cloud.google.com/bigquery/bq-command-line-tool-quickstart
BigQuery sources can be used as main inputs or side inputs. A main input (common case) is expected to be massive and will be split into manageable chunks and processed in parallel. Side inputs are expected to be small and will be read completely every time a ParDo DoFn gets executed. In the example below the lambda function implementing the DoFn for the Map transform will get on each call one row of the main table and all rows of the side table. The runner may use some caching techniques to share the side inputs between calls in order to avoid excessive reading::
main_table = pipeline | 'VeryBig' >> beam.io.Read(beam.io.BigQuerySource()
side_table = pipeline | 'NotBig' >> beam.io.Read(beam.io.BigQuerySource()
results = (
main_table
| 'ProcessData' >> beam.Map(
lambda element, side_input: ..., AsList(side_table)))
There is no difference in how main and side inputs are read. What makes the side_table a ‘side input’ is the AsList wrapper used when passing the table as a parameter to the Map transform. AsList signals to the execution framework that its input should be made available whole.
The main and side inputs are implemented differently. Reading a BigQuery table as main input entails exporting the table to a set of GCS files (currently in JSON format) and then processing those files. Reading the same table as a side input entails querying the table for all its rows. The coder argument on BigQuerySource controls the reading of the lines in the export files (i.e., transform a JSON object into a PCollection element). The coder is not involved when the same table is read as a side input since there is no intermediate format involved. We get the table rows directly from the BigQuery service with a query.
Users may provide a query to read from rather than reading all of a BigQuery table. If specified, the result obtained by executing the specified query will be used as the data of the input transform.:
query_results = pipeline | beam.io.Read(beam.io.BigQuerySource(
query='SELECT year, mean_temp FROM samples.weather_stations'))
When creating a BigQuery input transform, users should provide either a query or a table. Pipeline construction will fail with a validation error if neither or both are specified.
* Short introduction to BigQuery concepts * Tables have rows (TableRow) and each row has cells (TableCell). A table has a schema (TableSchema), which in turn describes the schema of each cell (TableFieldSchema). The terms field and cell are used interchangeably.
- TableSchema: Describes the schema (types and order) for values in each row.
- Has one attribute, ‘field’, which is list of TableFieldSchema objects.
- TableFieldSchema: Describes the schema (type, name) for one field.
- Has several attributes, including ‘name’ and ‘type’. Common values for the type attribute are: ‘STRING’, ‘INTEGER’, ‘FLOAT’, ‘BOOLEAN’. All possible values are described at: https://cloud.google.com/bigquery/preparing-data-for-bigquery#datatypes
- TableRow: Holds all values in a table row. Has one attribute, ‘f’, which is a
- list of TableCell instances.
- TableCell: Holds the value for one cell (or field). Has one attribute,
- ‘v’, which is a JsonValue instance. This class is defined in apitools.base.py.extra_types.py module.
-
class
apache_beam.io.gcp.bigquery.
TableRowJsonCoder
(table_schema=None)[source]¶ Bases:
apache_beam.coders.coders.Coder
A coder for a TableRow instance to/from a JSON string.
Note that the encoding operation (used when writing to sinks) requires the table schema in order to obtain the ordered list of field names. Reading from sources on the other hand does not need the table schema.
-
class
apache_beam.io.gcp.bigquery.
BigQueryDisposition
[source]¶ Bases:
object
Class holding standard strings used for create and write dispositions.
-
CREATE_IF_NEEDED
= 'CREATE_IF_NEEDED'¶
-
CREATE_NEVER
= 'CREATE_NEVER'¶
-
WRITE_APPEND
= 'WRITE_APPEND'¶
-
WRITE_EMPTY
= 'WRITE_EMPTY'¶
-
WRITE_TRUNCATE
= 'WRITE_TRUNCATE'¶
-
-
class
apache_beam.io.gcp.bigquery.
BigQuerySource
(table=None, dataset=None, project=None, query=None, validate=False, coder=None, use_standard_sql=False, flatten_results=True)[source]¶ Bases:
apache_beam.runners.dataflow.native_io.iobase.NativeSource
A source based on a BigQuery table.
-
format
¶ Source format name required for remote execution.
-
-
class
apache_beam.io.gcp.bigquery.
BigQuerySink
(table, dataset=None, project=None, schema=None, create_disposition='CREATE_IF_NEEDED', write_disposition='WRITE_EMPTY', validate=False, coder=None)[source]¶ Bases:
apache_beam.runners.dataflow.native_io.iobase.NativeSink
A sink based on a BigQuery table.
-
format
¶ Sink format name required for remote execution.
-
apache_beam.io.gcp.gcsfilesystem module¶
GCS file system implementation for accessing files on GCS.
-
class
apache_beam.io.gcp.gcsfilesystem.
GCSFileSystem
[source]¶ Bases:
apache_beam.io.filesystem.FileSystem
A GCS
FileSystem
implementation for accessing files on GCS.-
CHUNK_SIZE
= 100¶
-
GCS_PREFIX
= 'gs://'¶
-
copy
(source_file_names, destination_file_names)[source]¶ Recursively copy the file tree from the source to the destination
Parameters: - source_file_names – list of source file objects that needs to be copied
- destination_file_names – list of destination of the new object
Raises: BeamIOError
if any of the copy operations fail
-
create
(path, mime_type='application/octet-stream', compression_type='auto')[source]¶ Returns a write channel for the given file path.
Parameters: - path – string path of the file object to be written to the system
- mime_type – MIME type to specify the type of content in the file object
- compression_type – Type of compression to be used for this object
Returns: file handle with a close function for the user to use
-
delete
(paths)[source]¶ Deletes files or directories at the provided paths. Directories will be deleted recursively.
Parameters: paths – list of paths that give the file objects to be deleted
-
exists
(path)[source]¶ Check if the provided path exists on the FileSystem.
Parameters: path – string path that needs to be checked. Returns: boolean flag indicating if path exists
-
join
(basepath, *paths)[source]¶ Join two or more pathname components for the filesystem
Parameters: - basepath – string path of the first component of the path
- paths – path components to be added
Returns: full path after combining all the passed components
-
match
(patterns, limits=None)[source]¶ Find all matching paths to the pattern provided.
Parameters: - pattern – string for the file path pattern to match against
- limit – Maximum number of responses that need to be fetched
Returns: list of
MatchResult
objects.Raises: BeamIOError
if any of the pattern match operations fail
-
mkdirs
(path)[source]¶ Recursively create directories for the provided path.
Parameters: path – string path of the directory structure that should be created Raises: IOError if leaf directory already exists.
-
open
(path, mime_type='application/octet-stream', compression_type='auto')[source]¶ Returns a read channel for the given file path.
Parameters: - path – string path of the file object to be written to the system
- mime_type – MIME type to specify the type of content in the file object
- compression_type – Type of compression to be used for this object
Returns: file handle with a close function for the user to use
-
rename
(source_file_names, destination_file_names)[source]¶ Rename the files at the source list to the destination list. Source and destination lists should be of the same size.
Parameters: - source_file_names – List of file paths that need to be moved
- destination_file_names – List of destination_file_names for the files
Raises: BeamIOError
if any of the rename operations fail
-
split
(path)[source]¶ Splits the given path into two parts.
Splits the path into a pair (head, tail) such that tail contains the last component of the path and head contains everything up to that.
Head will include the GCS prefix (‘gs://’).
Parameters: path – path as a string Returns: a pair of path components as strings.
-
apache_beam.io.gcp.gcsio module¶
Google Cloud Storage client.
This library evolved from the Google App Engine GCS client available at https://github.com/GoogleCloudPlatform/appengine-gcs-client.
-
class
apache_beam.io.gcp.gcsio.
GcsIO
(storage_client=None)[source]¶ Bases:
object
Google Cloud Storage I/O client.
-
copy
(*args, **kwargs)¶
-
copy_batch
(src_dest_pairs)[source]¶ Copies the given GCS object from src to dest.
Parameters: src_dest_pairs – list of (src, dest) tuples of gs://<bucket>/<name> files paths to copy from src to dest, not to exceed MAX_BATCH_OPERATION_SIZE in length. - Returns: List of tuples of (src, dest, exception) in the same order as the
- src_dest_pairs argument, where exception is None if the operation succeeded or the relevant exception if the operation failed.
-
copytree
(src, dest)[source]¶ Renames the given GCS “directory” recursively from src to dest.
Parameters: - src – GCS file path pattern in the form gs://<bucket>/<name>/.
- dest – GCS file path pattern in the form gs://<bucket>/<name>/.
-
delete
(*args, **kwargs)¶
-
delete_batch
(paths)[source]¶ Deletes the objects at the given GCS paths.
Parameters: paths – List of GCS file path patterns in the form gs://<bucket>/<name>, not to exceed MAX_BATCH_OPERATION_SIZE in length. - Returns: List of tuples of (path, exception) in the same order as the paths
- argument, where exception is None if the operation succeeded or the relevant exception if the operation failed.
-
exists
(*args, **kwargs)¶
-
glob
(*args, **kwargs)¶
-
open
(filename, mode='r', read_buffer_size=16777216, mime_type='application/octet-stream')[source]¶ Open a GCS file path for reading or writing.
Parameters: - filename – GCS file path in the form gs://<bucket>/<object>.
- mode – ‘r’ for reading or ‘w’ for writing.
- read_buffer_size – Buffer size to use during read operations.
- mime_type – Mime type to set for write operations.
Returns: file object.
Raises: ValueError
– Invalid open file mode.
-
rename
(src, dest)[source]¶ Renames the given GCS object from src to dest.
Parameters: - src – GCS file path pattern in the form gs://<bucket>/<name>.
- dest – GCS file path pattern in the form gs://<bucket>/<name>.
-
size
(*args, **kwargs)¶
-
size_of_files_in_glob
(*args, **kwargs)¶
-
apache_beam.io.gcp.pubsub module¶
Google Cloud PubSub sources and sinks.
Cloud Pub/Sub sources and sinks are currently supported only in streaming pipelines, during remote execution.
-
class
apache_beam.io.gcp.pubsub.
PubSubSink
(topic, coder=StrUtf8Coder)[source]¶ Bases:
apache_beam.runners.dataflow.native_io.iobase.NativeSink
Sink for writing to a given Cloud Pub/Sub topic.
-
format
¶ Sink format name required for remote execution.
-
-
class
apache_beam.io.gcp.pubsub.
PubSubSource
(topic, subscription=None, id_label=None, coder=StrUtf8Coder)[source]¶ Bases:
apache_beam.runners.dataflow.native_io.iobase.NativeSource
Source for reading from a given Cloud Pub/Sub topic.
-
topic
¶ Cloud Pub/Sub topic in the form “/topics/<project>/<topic>”.
-
subscription
¶ Optional existing Cloud Pub/Sub subscription to use in the form “projects/<project>/subscriptions/<subscription>”.
-
id_label
¶ The attribute on incoming Pub/Sub messages to use as a unique record identifier. When specified, the value of this attribute (which can be any string that uniquely identifies the record) will be used for deduplication of messages. If not provided, Dataflow cannot guarantee that no duplicate data will be delivered on the Pub/Sub stream. In this case, deduplication of the stream will be strictly best effort.
-
coder
¶ The Coder to use for decoding incoming Pub/Sub messages.
-
format
¶ Source format name required for remote execution.
-