apache_beam.ml.rag.chunking.base module

class apache_beam.ml.rag.chunking.base.ChunkingTransformProvider(chunk_id_fn: Callable[[Chunk], str] | None = None)[source]

Bases: MLTransformProvider

Base class for chunking transforms in RAG pipelines.

ChunkingTransformProvider defines the interface for splitting documents into chunks for embedding and retrieval. Implementations should define how to split content while preserving metadata and managing chunk IDs.

The transform flow: - Takes input documents with content and metadata - Splits content into chunks using implementation-specific logic - Preserves document metadata in resulting chunks - Optionally assigns unique IDs to chunks (configurable via chunk_id_fn

Example usage:
>>> class MyChunker(ChunkingTransformProvider):
...     def get_splitter_transform(self):
...         return beam.ParDo(MySplitterDoFn())
...
>>> chunker = MyChunker(chunk_id_fn=my_id_function)
>>>
>>> with beam.Pipeline() as p:
...     chunks = (
...         p
...         | beam.Create([{'text': 'document...', 'source': 'doc.txt'}])
...         | MLTransform(...).with_transform(chunker))
Parameters:

chunk_id_fn – Optional function to generate chunk IDs. If not provided, random UUIDs will be used. Function should take a Chunk and return str.

abstract get_splitter_transform() PTransform[PCollection[Dict[str, Any]], PCollection[Chunk]][source]

Creates transforms that emits splits for given content.

get_ptransform_for_processing(**kwargs) PTransform[PCollection[Dict[str, Any]], PCollection[Chunk]][source]

Creates transform for processing documents into chunks.