Source code for apache_beam.io.gcp.tests.bigquery_matcher
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
"""Bigquery data verifier for end-to-end test."""
import logging
from hamcrest.core.base_matcher import BaseMatcher
from apache_beam.testing.test_utils import compute_hash
from apache_beam.utils import retry
__all__ = ['BigqueryMatcher']
# Protect against environments where bigquery library is not available.
# pylint: disable=wrong-import-order, wrong-import-position
try:
from google.cloud import bigquery
from google.cloud.exceptions import GoogleCloudError
except ImportError:
bigquery = None
# pylint: enable=wrong-import-order, wrong-import-position
MAX_RETRIES = 4
def retry_on_http_and_value_error(exception):
"""Filter allowing retries on Bigquery errors and value error."""
return isinstance(exception, (GoogleCloudError, ValueError))
[docs]class BigqueryMatcher(BaseMatcher):
"""Matcher that verifies Bigquery data with given query.
Fetch Bigquery data with given query, compute a hash string and compare
with expected checksum.
"""
def __init__(self, project, query, checksum):
if bigquery is None:
raise ImportError(
'Bigquery dependencies are not installed.')
if not query or not isinstance(query, str):
raise ValueError(
'Invalid argument: query. Please use non-empty string')
if not checksum or not isinstance(checksum, str):
raise ValueError(
'Invalid argument: checksum. Please use non-empty string')
self.project = project
self.query = query
self.expected_checksum = checksum
def _matches(self, _):
logging.info('Start verify Bigquery data.')
# Run query
bigquery_client = bigquery.Client(project=self.project)
response = self._query_with_retry(bigquery_client)
logging.info('Read from given query (%s), total rows %d',
self.query, len(response))
# Compute checksum
self.checksum = compute_hash(response)
logging.info('Generate checksum: %s', self.checksum)
# Verify result
return self.checksum == self.expected_checksum
@retry.with_exponential_backoff(
num_retries=MAX_RETRIES,
retry_filter=retry_on_http_and_value_error)
def _query_with_retry(self, bigquery_client):
"""Run Bigquery query with retry if got error http response"""
query = bigquery_client.run_sync_query(self.query)
query.run()
# Fetch query data one page at a time.
page_token = None
results = []
while True:
rows, _, page_token = query.fetch_data(page_token=page_token)
results.extend(rows)
if not page_token:
break
return results
[docs] def describe_to(self, description):
description \
.append_text("Expected checksum is ") \
.append_text(self.expected_checksum)
[docs] def describe_mismatch(self, pipeline_result, mismatch_description):
mismatch_description \
.append_text("Actual checksum is ") \
.append_text(self.checksum)