Google BigQuery patterns
The samples on this page show you common patterns for use with BigQueryIO.
- Java SDK
- Python SDK
BigQueryIO deadletter pattern
In production systems, it is useful to implement the deadletter pattern with BigQueryIO outputting any elements which had errors during processing by BigQueryIO into another PCollection for further processing. The samples below print the errors, but in a production system they can be sent to a deadletter table for later correction.
When using STREAMING_INSERTS
you can use the WriteResult
object to access a PCollection
with the TableRows
that failed to be inserted into BigQuery.
If you also set the withExtendedErrorInfo
property , you will be able to access a PCollection<BigQueryInsertError>
from the WriteResult
. The PCollection
will then include a reference to the table, the data row and the InsertErrors
. Which errors are added to the deadletter queue is determined via the InsertRetryPolicy
.
In the result tuple you can access FailedRows
to access the failed inserts.
PipelineOptions options =
PipelineOptionsFactory.fromArgs(args).withValidation().as(BigQueryOptions.class);
Pipeline p = Pipeline.create(options);
// Create a bug by writing the 2nd value as null. The API will correctly
// throw an error when trying to insert a null value into a REQUIRED field.
WriteResult result =
p.apply(Create.of(1, 2))
.apply(
BigQueryIO.<Integer>write()
.withSchema(
new TableSchema()
.setFields(
ImmutableList.of(
new TableFieldSchema()
.setName("num")
.setType("INTEGER")
.setMode("REQUIRED"))))
.to("Test.dummyTable")
.withFormatFunction(x -> new TableRow().set("num", (x == 2) ? null : x))
.withFailedInsertRetryPolicy(InsertRetryPolicy.retryTransientErrors())
// Forcing the bounded pipeline to use streaming inserts
.withMethod(BigQueryIO.Write.Method.STREAMING_INSERTS)
// set the withExtendedErrorInfo property.
.withExtendedErrorInfo()
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND));
result
.getFailedInsertsWithErr()
.apply(
MapElements.into(TypeDescriptors.strings())
.via(
x -> {
System.out.println(" The table was " + x.getTable());
System.out.println(" The row was " + x.getRow());
System.out.println(" The error was " + x.getError());
return "";
}));
p.run();
/* Sample Output From the pipeline:
<p>The table was GenericData{classInfo=[datasetId, projectId, tableId], {datasetId=Test,projectId=<>, tableId=dummyTable}}
<p>The row was GenericData{classInfo=[f], {num=null}}
<p>The error was GenericData{classInfo=[errors, index],{errors=[GenericData{classInfo=[debugInfo, location, message, reason], {debugInfo=,location=, message=Missing required field: Msg_0_CLOUD_QUERY_TABLE.num., reason=invalid}}],index=0}}
*/
}
# Create pipeline.
schema = ({'fields': [{'name': 'a', 'type': 'STRING', 'mode': 'REQUIRED'}]})
pipeline = beam.Pipeline()
errors = (
pipeline | 'Data' >> beam.Create([1, 2])
| 'CreateBrokenData' >>
beam.Map(lambda src: {'a': src} if src == 2 else {'a': None})
| 'WriteToBigQuery' >> beam.io.WriteToBigQuery(
"<Your Project:Test.dummy_a_table",
schema=schema,
insert_retry_strategy='RETRY_ON_TRANSIENT_ERROR',
create_disposition='CREATE_IF_NEEDED',
write_disposition='WRITE_APPEND'))
result = (
errors['FailedRows']
| 'PrintErrors' >>
beam.FlatMap(lambda err: print("Error Found {}".format(err))))
Last updated on 2025/01/19
Have you found everything you were looking for?
Was it all useful and clear? Is there anything that you would like to change? Let us know!