Class FileAwareFactoryFn<T>

java.lang.Object
org.apache.beam.sdk.extensions.kafka.factories.FileAwareFactoryFn<T>
Type Parameters:
T - The type of object this factory creates.
All Implemented Interfaces:
Serializable, ProcessFunction<Map<String,Object>,T>, SerializableFunction<Map<String,Object>,T>
Direct Known Subclasses:
KerberosConsumerFactoryFn

public abstract class FileAwareFactoryFn<T> extends Object implements SerializableFunction<Map<String,Object>,T>
An abstract SerializableFunction that serves as a base class for factories that need to process a configuration map to handle external resources like files and secrets.

This class is designed to be extended by concrete factory implementations (e.g., for creating Kafka consumers). It automates the process of detecting special URI strings within the configuration values and transforming them before passing the processed configuration to the subclass.

Supported Patterns:

  • External File Paths: It recognizes paths prefixed with schemes like gs:// or s3:// that are supported by the Beam FileSystems API. It downloads these files to a local temporary directory (under /tmp/<factory-type>/...) and replaces the original path in the configuration with the new local file path.
  • Secret Manager Values: It recognizes strings prefixed with secretValue:. It interprets the rest of the string as a Google Secret Manager secret version name (e.g., "projects/p/secrets/s/versions/v"), fetches the secret payload, and replaces the original secretValue:... identifier with the plain-text secret.

Usage:

A subclass must implement the createObject(Map) method, which receives the fully processed configuration map with all paths localized and secrets resolved. Subclasses can also override downloadAndProcessExtraFiles() to handle specific preliminary file downloads (e.g., a krb5.conf file) before the main configuration processing begins.

See Also:
  • Field Details

  • Constructor Details

    • FileAwareFactoryFn

      public FileAwareFactoryFn(String factoryType)
  • Method Details

    • createObject

      protected abstract T createObject(Map<String,Object> config)
    • apply

      public T apply(Map<String,Object> config)
      Description copied from interface: SerializableFunction
      Returns the result of invoking this function on the given input.
      Specified by:
      apply in interface ProcessFunction<Map<String,Object>,T>
      Specified by:
      apply in interface SerializableFunction<Map<String,Object>,T>
    • downloadExternalFile

      protected static String downloadExternalFile(String externalFilePath, String outputFileString) throws IOException
      A function to download files from their specified external storage path and copy them to the provided local filepath. The local filepath is provided by the replacePathWithLocal.
      Parameters:
      externalFilePath -
      outputFileString -
      Returns:
      Throws:
      IOException
    • getSecretWithCache

      protected byte[] getSecretWithCache(String secretId)
    • downloadAndProcessExtraFiles

      protected void downloadAndProcessExtraFiles() throws IOException
      Throws:
      IOException - A hook for subclasses to download and process specific files before the main configuration is handled. For example, the kerberos factory can use this to download a krb5.conf and set a system property.
    • getBaseDirectory

      protected String getBaseDirectory()
    • getSecret

      protected byte[] getSecret(String secretVersion)
    • processSecret

      protected String processSecret(String originalValue, String secretId, byte[] secretValue)