#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#
# Unless required by applicable law or agreed to in writing, software
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
#

"""Metadata for use in BigQueryIO, i.e. a job_id to use in BQ job labels."""

# pytype: skip-file

import re

_VALID_CLOUD_LABEL_PATTERN = re.compile(r'^[a-z0-9\_\-]{1,63}\$')

def _sanitize_value(value):
"""Sanitizes a value into a valid BigQuery label value."""
return re.sub(r'[^\w-]+', '', value.lower().replace('/', '-'))[0:63]

def _is_valid_cloud_label_value(label_value):
"""Returns true if label_value is a valid cloud label string.

This function can return false in cases where the label value is valid.
However, it will not return true in a case where the lavel value is invalid.
This is because a stricter set of allowed characters is used in this
validator, because foreign language characters are not accepted.
Thus, this should not be used as a generic validator for all cloud labels.

Args:
label_value: The label value to validate.

Returns:
True if the label value is a valid
"""
return _VALID_CLOUD_LABEL_PATTERN.match(label_value)

This will request metadata properly based on which runner is being used.
"""
# If a dataflow_job id is returned on GCE metadata. Then it means
# This program is running on a Dataflow GCE VM.
is_dataflow_runner = bool(dataflow_job_id)
kwargs = {}
if is_dataflow_runner:
# Only use this label if it is validated already.
# As we do not want a bad label to fail the BQ job.
if _is_valid_cloud_label_value(dataflow_job_id):
kwargs['beam_job_id'] = dataflow_job_id
if step_name:
step_name = _sanitize_value(step_name)
if _is_valid_cloud_label_value(step_name):
kwargs['step_name'] = step_name

"""Metadata class for BigQueryIO. i.e. to use as BQ job labels.

Do not construct directly, use the create_bigquery_io_metadata factory.
Which will request metadata properly based on which runner is being used.
"""
def __init__(self, beam_job_id=None, step_name=None):
self.beam_job_id = beam_job_id
self.step_name = step_name