Source code for apache_beam.runners.interactive.sql.utils

#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

"""Module of utilities for SQL magics.

For internal use only; no backward-compatibility guarantees.
"""

# pytype: skip-file

import logging
import os
import tempfile
from dataclasses import dataclass
from typing import Any
from typing import Callable
from typing import Dict
from typing import NamedTuple
from typing import Optional
from typing import Type
from typing import Union

import apache_beam as beam
from apache_beam.io import WriteToText
from apache_beam.options.pipeline_options import GoogleCloudOptions
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import SetupOptions
from apache_beam.options.pipeline_options import WorkerOptions
from apache_beam.runners.interactive.utils import create_var_in_main
from apache_beam.runners.interactive.utils import progress_indicated
from apache_beam.runners.runner import create_runner
from apache_beam.typehints.native_type_compatibility import match_is_named_tuple
from apache_beam.utils.interactive_utils import is_in_ipython

_LOGGER = logging.getLogger(__name__)


[docs]def register_coder_for_schema( schema: NamedTuple, verbose: bool = False) -> None: """Registers a RowCoder for the given schema if hasn't. Notifies the user of what code has been implicitly executed. """ assert match_is_named_tuple(schema), ( 'Schema %s is not a typing.NamedTuple.' % schema) coder = beam.coders.registry.get_coder(schema) if not isinstance(coder, beam.coders.RowCoder): if verbose: _LOGGER.warning( 'Schema %s has not been registered to use a RowCoder. ' 'Automatically registering it by running: ' 'beam.coders.registry.register_coder(%s, ' 'beam.coders.RowCoder)', schema.__name__, schema.__name__) beam.coders.registry.register_coder(schema, beam.coders.RowCoder)
[docs]def find_pcolls( sql: str, pcolls: Dict[str, beam.PCollection], verbose: bool = False) -> Dict[str, beam.PCollection]: """Finds all PCollections used in the given sql query. It does a simple word by word match and calls ib.collect for each PCollection found. """ found = {} for word in sql.split(): if word in pcolls: found[word] = pcolls[word] if found: if verbose: _LOGGER.info('Found PCollections used in the magic: %s.', found) _LOGGER.info('Collecting data...') return found
[docs]def replace_single_pcoll_token(sql: str, pcoll_name: str) -> str: """Replaces the pcoll_name used in the sql with 'PCOLLECTION'. For sql query using only a single PCollection, the PCollection needs to be referred to as 'PCOLLECTION' instead of its variable/tag name. """ words = sql.split() token_locations = [] i = 0 for word in words: if word.lower() == 'from': token_locations.append(i + 1) i += 2 continue i += 1 for token_location in token_locations: if token_location < len(words) and words[token_location] == pcoll_name: words[token_location] = 'PCOLLECTION' return ' '.join(words)
[docs]def pformat_namedtuple(schema: NamedTuple) -> str: return '{}({})'.format( schema.__name__, ', '.join([ '{}: {}'.format(k, repr(v)) for k, v in schema.__annotations__.items() ]))
[docs]def pformat_dict(raw_input: Dict[str, Any]) -> str: return '{{\n{}\n}}'.format( ',\n'.join(['{}: {}'.format(k, v) for k, v in raw_input.items()]))
[docs]@dataclass class OptionsEntry: """An entry of PipelineOptions that can be visualized through ipywidgets to take inputs in IPython notebooks interactively. Attributes: label: The value of the Label widget. help: The help message of the entry, usually the same to the help in PipelineOptions. cls: The PipelineOptions class/subclass the options belong to. arg_builder: Builds the argument/option. If it's a str, this entry assigns the input ipywidget's value directly to the argument. If it's a Dict, use the corresponding Callable to assign the input value to each argument. If Callable is None, fallback to assign the input value directly. This allows building multiple similar PipelineOptions arguments from a single input, such as staging_location and temp_location in GoogleCloudOptions. default: The default value of the entry, None if absent. """ label: str help: str cls: Type[PipelineOptions] arg_builder: Union[str, Dict[str, Optional[Callable]]] default: Optional[str] = None def __post_init__(self): # The attribute holds an ipywidget, currently only supports Text. # The str value can be accessed by self.input.value. self.input = None
[docs]class OptionsForm: """A form visualized to take inputs from users in IPython Notebooks and generate PipelineOptions to run pipelines. """ def __init__(self): # The current Python SDK incorrectly parses unparsable pipeline options # Here we ignore all flags for the interactive beam_sql magic # since the beam_sql magic does not use flags self.options = PipelineOptions(flags={}) self.entries = []
[docs] def add(self, entry: OptionsEntry) -> 'OptionsForm': """Adds an OptionsEntry to the form. """ self.entries.append(entry) return self
[docs] def to_options(self) -> PipelineOptions: """Builds the PipelineOptions based on user inputs. Can only be invoked after display_for_input. """ for entry in self.entries: assert entry.input, ( 'to_options invoked before display_for_input. ' 'Wrong usage.') view = self.options.view_as(entry.cls) if isinstance(entry.arg_builder, str): setattr(view, entry.arg_builder, entry.input.value) else: for arg, builder in entry.arg_builder.items(): if builder: setattr(view, arg, builder(entry.input.value)) else: setattr(view, arg, entry.input.value) self.additional_options() return self.options
[docs] def additional_options(self): """Alters the self.options with additional config.""" pass
[docs] def display_for_input(self) -> 'OptionsForm': """Displays the widgets to take user inputs.""" from IPython.display import display from ipywidgets import GridBox from ipywidgets import Label from ipywidgets import Layout from ipywidgets import Text widgets = [] for entry in self.entries: text_label = Label(value=entry.label) text_input = entry.input if entry.input else Text( value=entry.default if entry.default else '') text_help = Label(value=entry.help) entry.input = text_input widgets.append(text_label) widgets.append(text_input) widgets.append(text_help) grid = GridBox(widgets, layout=Layout(grid_template_columns='1fr 2fr 6fr')) display(grid) self.display_actions() return self
[docs] def display_actions(self): """Displays actionable widgets to utilize the options, run pipelines and etc.""" pass
[docs]class DataflowOptionsForm(OptionsForm): """A form to take inputs from users in IPython Notebooks to build PipelineOptions to run pipelines on Dataflow. Only contains minimum fields needed. """ @staticmethod def _build_default_project() -> str: """Builds a default project id.""" try: # pylint: disable=c-extension-no-member import google.auth return google.auth.default()[1] except (KeyboardInterrupt, SystemExit): raise except Exception as e: _LOGGER.warning('There is some issue with your gcloud auth: %s', e) return 'your-project-id' @staticmethod def _build_req_file_from_pkgs(pkgs) -> Optional[str]: """Builds a requirements file that contains all additional PYPI packages needed.""" if pkgs: deps = pkgs.split(',') req_file = os.path.join( tempfile.mkdtemp(prefix='beam-sql-dataflow-'), 'req.txt') with open(req_file, 'a') as f: for dep in deps: f.write(dep.strip() + '\n') return req_file return None def __init__( self, output_name: str, output_pcoll: beam.PCollection, verbose: bool = False): """Inits the OptionsForm for setting up Dataflow jobs.""" super().__init__() self.p = output_pcoll.pipeline self.output_name = output_name self.output_pcoll = output_pcoll self.verbose = verbose self.notice_shown = False self.add( OptionsEntry( label='Project Id', help='Name of the Cloud project owning the Dataflow job.', cls=GoogleCloudOptions, arg_builder='project', default=DataflowOptionsForm._build_default_project()) ).add( OptionsEntry( label='Region', help='The Google Compute Engine region for creating Dataflow job.', cls=GoogleCloudOptions, arg_builder='region', default='us-central1') ).add( OptionsEntry( label='GCS Bucket', help=( 'GCS path to stage code packages needed by workers and save ' 'temporary workflow jobs.'), cls=GoogleCloudOptions, arg_builder={ 'staging_location': lambda x: x + '/staging', 'temp_location': lambda x: x + '/temp' }, default='gs://YOUR_GCS_BUCKET_HERE') ).add( OptionsEntry( label='Additional Packages', help=( 'PYPI packages installed, comma-separated. If None, leave ' 'this field empty.'), cls=SetupOptions, arg_builder={ 'requirements_file': lambda x: DataflowOptionsForm. _build_req_file_from_pkgs(x) }, default=''))
[docs] def additional_options(self): # Use the latest Java SDK by default. sdk_overrides = self.options.view_as( WorkerOptions).sdk_harness_container_image_overrides override = '.*java.*,apache/beam_java11_sdk:latest' if sdk_overrides and override not in sdk_overrides: sdk_overrides.append(override) else: self.options.view_as( WorkerOptions).sdk_harness_container_image_overrides = [override]
[docs] def display_actions(self): from IPython.display import HTML from IPython.display import display from ipywidgets import Button from ipywidgets import GridBox from ipywidgets import Layout from ipywidgets import Output options_output_area = Output() run_output_area = Output() run_btn = Button( description='Run on Dataflow', button_style='success', tooltip=( 'Submit to Dataflow for execution with the configured options. The ' 'output PCollection\'s data will be written to the GCS bucket you ' 'configure.')) show_options_btn = Button( description='Show Options', button_style='info', tooltip='Show current pipeline options configured.') def _run_on_dataflow(btn): with run_output_area: run_output_area.clear_output() @progress_indicated def _inner(): options = self.to_options() # Caches the output_pcoll to a GCS bucket. try: execution_count = 0 if is_in_ipython(): from IPython import get_ipython execution_count = get_ipython().execution_count output_location = '{}/{}'.format( options.view_as(GoogleCloudOptions).staging_location, self.output_name) _ = self.output_pcoll | 'WriteOuput{}_{}ToGCS'.format( self.output_name, execution_count) >> WriteToText(output_location) _LOGGER.info( 'Data of output PCollection %s will be written to %s', self.output_name, output_location) except (KeyboardInterrupt, SystemExit): raise except: # pylint: disable=bare-except # The transform has been added before, noop. pass if self.verbose: _LOGGER.info( 'Running the pipeline on Dataflow with pipeline options %s.', pformat_dict(options.display_data())) result = create_runner('DataflowRunner').run_pipeline(self.p, options) cloud_options = options.view_as(GoogleCloudOptions) url = ( 'https://console.cloud.google.com/dataflow/jobs/%s/%s?project=%s' % (cloud_options.region, result.job_id(), cloud_options.project)) display( HTML( 'Click <a href="%s" target="_new">here</a> for the details ' 'of your Dataflow job.' % url)) result_name = 'result_{}'.format(self.output_name) create_var_in_main(result_name, result) if self.verbose: _LOGGER.info( 'The pipeline result of the run can be accessed from variable ' '%s. The current status is %s.', result_name, result) try: btn.disabled = True _inner() finally: btn.disabled = False run_btn.on_click(_run_on_dataflow) def _show_options(btn): with options_output_area: options_output_area.clear_output() options = self.to_options() options_name = 'options_{}'.format(self.output_name) create_var_in_main(options_name, options) _LOGGER.info( 'The pipeline options configured is: %s.', pformat_dict(options.display_data())) show_options_btn.on_click(_show_options) grid = GridBox([run_btn, show_options_btn], layout=Layout(grid_template_columns='repeat(2, 200px)')) display(grid) # Implicitly initializes the options variable before 1st time showing # options. options_name_inited, _ = create_var_in_main('options_{}'.format( self.output_name), self.to_options()) if not self.notice_shown: _LOGGER.info( 'The pipeline options can be configured through variable %s. You ' 'may also add additional options or sink transforms such as write ' 'to BigQuery in other notebook cells. Come back to click "Run on ' 'Dataflow" button once you complete additional configurations. ' 'Optionally, you can chain more beam_sql magics with DataflowRunner ' 'and click "Run on Dataflow" in their outputs.', options_name_inited) self.notice_shown = True display(options_output_area) display(run_output_area)