apache_beam.ml.rag.ingestion.mysql module

class apache_beam.ml.rag.ingestion.mysql.MySQLVectorWriterConfig(connection_config: ~apache_beam.ml.rag.ingestion.jdbc_common.ConnectionConfig, table_name: str, *, write_config: ~apache_beam.ml.rag.ingestion.jdbc_common.WriteConfig = WriteConfig(autosharding=None, max_connections=None, write_batch_size=None), column_specs: ~typing.List[~apache_beam.ml.rag.ingestion.mysql_common.ColumnSpec] = [ColumnSpec(column_name='id', python_type=<class 'str'>, value_fn=<function ColumnSpecsBuilder.with_id_spec.<locals>.value_fn>, placeholder='?'), ColumnSpec(column_name='embedding', python_type=<class 'str'>, value_fn=<function ColumnSpecsBuilder.with_embedding_spec.<locals>.value_fn>, placeholder='string_to_vector(?)'), ColumnSpec(column_name='content', python_type=<class 'str'>, value_fn=<function ColumnSpecsBuilder.with_content_spec.<locals>.value_fn>, placeholder='?'), ColumnSpec(column_name='metadata', python_type=<class 'str'>, value_fn=<function ColumnSpecsBuilder.with_metadata_spec.<locals>.value_fn>, placeholder='?')], conflict_resolution: ~apache_beam.ml.rag.ingestion.mysql_common.ConflictResolution | None = None)[source]

Bases: VectorDatabaseWriteConfig

Configuration for writing vectors to MySQL using jdbc.

Supports flexible schema configuration through column specifications and conflict resolution strategies with MySQL-specific syntax.

Parameters:
  • connection_configConnectionConfig.

  • table_name – Target table name.

  • write_config – JdbcIO WriteConfig to control batch sizes, authosharding, etc.

  • column_specs – Use ColumnSpecsBuilder to configure how embeddings and metadata are written to the database schema. If None, uses default Chunk schema with MySQL vector functions.

  • conflict_resolution – Optional ConflictResolution strategy for handling insert conflicts. ON DUPLICATE KEY UPDATE. None by default, meaning errors are thrown when attempting to insert duplicates.

Examples

Simple case with default schema:

>>> config = MySQLVectorWriterConfig(
...     connection_config=ConnectionConfig(...),
...     table_name='embeddings'
... )

Custom schema with metadata fields and MySQL functions:

>>> specs = (ColumnSpecsBuilder()
...         .with_id_spec(column_name="my_id_column")
...         .with_embedding_spec(
...             column_name="embedding_vec",
...             placeholder="string_to_vector(?)"
...         )
...         .add_metadata_field(field="source", column_name="src")
...         .add_metadata_field(
...             "timestamp",
...             column_name="created_at",
...             placeholder="STR_TO_DATE(?, '%Y-%m-%d %H:%i:%s')"
...         )
...         .build())

Minimal schema (only ID + embedding written):

>>> column_specs = (ColumnSpecsBuilder()
...     .with_id_spec()
...     .with_embedding_spec()
...     .build())
>>> config = MySQLVectorWriterConfig(
...     connection_config=ConnectionConfig(...),
...     table_name='embeddings',
...     column_specs=specs,
...     conflict_resolution=ConflictResolution(
...         on_conflict_fields=["id"],
...         action="UPDATE",
...         update_fields=["embedding", "content"]
...     )
... )

Using MySQL JSON functions:

>>> specs = (ColumnSpecsBuilder()
...     .with_id_spec()
...     .with_embedding_spec()
...     .with_metadata_spec(
...         column_name="metadata_json",
...         placeholder="CAST(? AS JSON)"
...     )
...     .build())
create_write_transform() PTransform[source]