apache_beam.ml.rag.ingestion.mysql module
- class apache_beam.ml.rag.ingestion.mysql.MySQLVectorWriterConfig(connection_config: ~apache_beam.ml.rag.ingestion.jdbc_common.ConnectionConfig, table_name: str, *, write_config: ~apache_beam.ml.rag.ingestion.jdbc_common.WriteConfig = WriteConfig(autosharding=None, max_connections=None, write_batch_size=None), column_specs: ~typing.List[~apache_beam.ml.rag.ingestion.mysql_common.ColumnSpec] = [ColumnSpec(column_name='id', python_type=<class 'str'>, value_fn=<function ColumnSpecsBuilder.with_id_spec.<locals>.value_fn>, placeholder='?'), ColumnSpec(column_name='embedding', python_type=<class 'str'>, value_fn=<function ColumnSpecsBuilder.with_embedding_spec.<locals>.value_fn>, placeholder='string_to_vector(?)'), ColumnSpec(column_name='content', python_type=<class 'str'>, value_fn=<function ColumnSpecsBuilder.with_content_spec.<locals>.value_fn>, placeholder='?'), ColumnSpec(column_name='metadata', python_type=<class 'str'>, value_fn=<function ColumnSpecsBuilder.with_metadata_spec.<locals>.value_fn>, placeholder='?')], conflict_resolution: ~apache_beam.ml.rag.ingestion.mysql_common.ConflictResolution | None = None)[source]
Bases:
VectorDatabaseWriteConfig
Configuration for writing vectors to MySQL using jdbc.
Supports flexible schema configuration through column specifications and conflict resolution strategies with MySQL-specific syntax.
- Parameters:
connection_config –
ConnectionConfig
.table_name – Target table name.
write_config – JdbcIO
WriteConfig
to control batch sizes, authosharding, etc.column_specs – Use
ColumnSpecsBuilder
to configure how embeddings and metadata are written to the database schema. If None, uses default Chunk schema with MySQL vector functions.conflict_resolution – Optional
ConflictResolution
strategy for handling insert conflicts. ON DUPLICATE KEY UPDATE. None by default, meaning errors are thrown when attempting to insert duplicates.
Examples
Simple case with default schema:
>>> config = MySQLVectorWriterConfig( ... connection_config=ConnectionConfig(...), ... table_name='embeddings' ... )
Custom schema with metadata fields and MySQL functions:
>>> specs = (ColumnSpecsBuilder() ... .with_id_spec(column_name="my_id_column") ... .with_embedding_spec( ... column_name="embedding_vec", ... placeholder="string_to_vector(?)" ... ) ... .add_metadata_field(field="source", column_name="src") ... .add_metadata_field( ... "timestamp", ... column_name="created_at", ... placeholder="STR_TO_DATE(?, '%Y-%m-%d %H:%i:%s')" ... ) ... .build())
Minimal schema (only ID + embedding written):
>>> column_specs = (ColumnSpecsBuilder() ... .with_id_spec() ... .with_embedding_spec() ... .build())
>>> config = MySQLVectorWriterConfig( ... connection_config=ConnectionConfig(...), ... table_name='embeddings', ... column_specs=specs, ... conflict_resolution=ConflictResolution( ... on_conflict_fields=["id"], ... action="UPDATE", ... update_fields=["embedding", "content"] ... ) ... )
Using MySQL JSON functions:
>>> specs = (ColumnSpecsBuilder() ... .with_id_spec() ... .with_embedding_spec() ... .with_metadata_spec( ... column_name="metadata_json", ... placeholder="CAST(? AS JSON)" ... ) ... .build())
- create_write_transform() PTransform [source]