Source code for apache_beam.testing.pipeline_verifiers

#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

"""End-to-end test result verifiers

A set of verifiers that are used in end-to-end tests to verify state/output
of test pipeline job. Customized verifier should extend
`hamcrest.core.base_matcher.BaseMatcher` and override _matches.
"""

import logging
import time

from hamcrest.core.base_matcher import BaseMatcher

from apache_beam.io.filesystems import FileSystems
from apache_beam.runners.runner import PipelineState
from apache_beam.testing import test_utils as utils
from apache_beam.utils import retry

__all__ = [
    'PipelineStateMatcher',
    'FileChecksumMatcher',
    'retry_on_io_error_and_server_error',
    ]


try:
  from apitools.base.py.exceptions import HttpError
except ImportError:
  HttpError = None

MAX_RETRIES = 4


[docs]class PipelineStateMatcher(BaseMatcher): """Matcher that verify pipeline job terminated in expected state Matcher compares the actual pipeline terminate state with expected. By default, `PipelineState.DONE` is used as expected state. """ def __init__(self, expected_state=PipelineState.DONE): self.expected_state = expected_state def _matches(self, pipeline_result): return pipeline_result.state == self.expected_state
[docs] def describe_to(self, description): description \ .append_text("Test pipeline expected terminated in state: ") \ .append_text(self.expected_state)
[docs] def describe_mismatch(self, pipeline_result, mismatch_description): mismatch_description \ .append_text("Test pipeline job terminated in state: ") \ .append_text(pipeline_result.state)
[docs]def retry_on_io_error_and_server_error(exception): """Filter allowing retries on file I/O errors and service error.""" return isinstance(exception, IOError) or \ (HttpError is not None and isinstance(exception, HttpError))
[docs]class FileChecksumMatcher(BaseMatcher): """Matcher that verifies file(s) content by comparing file checksum. Use apache_beam.io.filebasedsink to fetch file(s) from given path. File checksum is a hash string computed from content of file(s). """ def __init__(self, file_path, expected_checksum, sleep_secs=None): """Initialize a FileChecksumMatcher object Args: file_path : A string that is the full path of output file. This path can contain globs. expected_checksum : A hash string that is computed from expected result. sleep_secs : Number of seconds to wait before verification start. Extra time are given to make sure output files are ready on FS. """ if sleep_secs is not None: if isinstance(sleep_secs, int): self.sleep_secs = sleep_secs else: raise ValueError('Sleep seconds, if received, must be int. ' 'But received: %r, %s' % (sleep_secs, type(sleep_secs))) else: self.sleep_secs = None self.file_path = file_path self.expected_checksum = expected_checksum @retry.with_exponential_backoff( num_retries=MAX_RETRIES, retry_filter=retry_on_io_error_and_server_error) def _read_with_retry(self): """Read path with retry if I/O failed""" read_lines = [] match_result = FileSystems.match([self.file_path])[0] matched_path = [f.path for f in match_result.metadata_list] if not matched_path: raise IOError('No such file or directory: %s' % self.file_path) logging.info('Find %d files in %s: \n%s', len(matched_path), self.file_path, '\n'.join(matched_path)) for path in matched_path: with FileSystems.open(path, 'r') as f: for line in f: read_lines.append(line) return read_lines def _matches(self, _): if self.sleep_secs: # Wait to have output file ready on FS logging.info('Wait %d seconds...', self.sleep_secs) time.sleep(self.sleep_secs) # Read from given file(s) path read_lines = self._read_with_retry() # Compute checksum self.checksum = utils.compute_hash(read_lines) logging.info('Read from given path %s, %d lines, checksum: %s.', self.file_path, len(read_lines), self.checksum) return self.checksum == self.expected_checksum
[docs] def describe_to(self, description): description \ .append_text("Expected checksum is ") \ .append_text(self.expected_checksum)
[docs] def describe_mismatch(self, pipeline_result, mismatch_description): mismatch_description \ .append_text("Actual checksum is ") \ .append_text(self.checksum)