Anomaly Detection Example
The anomaly detection example demonstrates how to set up an anomaly detection pipeline that reads text from Pub/Sub in real time, and then detects anomalies using a trained HDBSCAN clustering model.
Dataset for Anomaly Detection
This example uses a dataset called emotion that contains 20,000 English Twitter messages with 6 basic emotions: anger, fear, joy, love, sadness, and surprise. The dataset has three splits: train (for training), validation, and test (for performance evaluation). Because it contains the text and the category (class) of the dataset, it is a supervised dataset. You can use the Hugging Face datasets page to access this dataset.
The following text shows examples from the train split of the dataset:
Text | Type of emotion |
---|---|
im grabbing a minute to post i feel greedy wrong | Anger |
i am ever feeling nostalgic about the fireplace i will know that it is still on the property | Love |
ive been taking or milligrams or times recommended amount and ive fallen asleep a lot faster but i also feel like so funny | Fear |
on a boat trip to denmark | Joy |
i feel you know basically like a fake in the realm of science fiction | Sadness |
i began having them several times a week feeling tortured by the hallucinations moving people and figures sounds and vibrations | Fear |
Anomaly Detection Algorithm
HDBSCAN is a clustering algorithm that extends DBSCAN by converting it into a hierarchical clustering algorithm and then extracting a flat clustering based in the stability of clusters. When trained, the model predicts -1
if a new data point is an outlier, otherwise it predicts one of the existing clusters.
Ingestion to Pub/Sub
Ingest the data into Pub/Sub so that while clustering, the model can read the tweets from Pub/Sub. Pub/Sub is a messaging service for exchanging event data among applications and services. Streaming analytics and data integration pipelines use Pub/Sub to ingest and distribute data.
You can see the full example code for ingesting data into Pub/Sub in GitHub
The file structure for the ingestion pipeline is shown in the following diagram:
write_data_to_pubsub_pipeline/
├── pipeline/
│ ├── __init__.py
│ ├── options.py
│ └── utils.py
├── __init__.py
├── config.py
├── main.py
└── setup.py
pipeline/utils.py
contains the code for loading the emotion dataset and two beam.DoFn
that are used for data transformation.
pipeline/options.py
contains the pipeline options to configure the Dataflow pipeline.
config.py
defines variables that are used multiple times, like Google Cloud PROJECT_ID and NUM_WORKERS.
setup.py
defines the packages and requirements for the pipeline to run.
main.py
contains the pipeline code and additional functions used for running the pipeline.
Run the Pipeline
To run the pipeline, install the required packages.For this example, you need access to a Google Cloud project, and you need to configure the Google Cloud variables, like PROJECT_ID
, REGION
, PubSub TOPIC_ID
, and others in the config.py
file.
- Locally on your machine:
python main.py
- On GCP for Dataflow:
python main.py --mode cloud
The write_data_to_pubsub_pipeline
contains four different transforms:
- Load the emotion dataset using Hugging Face datasets (for simplicity, we take samples from three classes instead of six).
- Associate each piece of text with a unique identifier (UID).
- Convert the text into the format that Pub/Sub expects.
- Write the formatted message to Pub/Sub.
Anomaly Detection on Streaming Data
After ingesting the data to Pub/Sub, run the anomaly detection pipeline. This pipeline reads the streaming message from Pub/Sub, converts the text to an embedding using a language model, and feeds the embedding to an already trained clustering model to predict whether the message is an anomaly. One prerequisite for this pipeline is to have an HDBSCAN clustering model trained on the training split of the dataset.
You can find the full example code for anomaly detection in GitHub
The following diagram shows the file structure for the anomaly_detection pipeline:
anomaly_detection_pipeline/
├── pipeline/
│ ├── __init__.py
│ ├── options.py
│ └── transformations.py
├── __init__.py
├── config.py
├── main.py
└── setup.py
pipeline/transformations.py
contains the code for different beam.DoFn
and additional functions that are used in pipeline.
pipeline/options.py
contains the pipeline options to configure the Dataflow pipeline.
config.py
defines variables that are used multiple times, like the Google Cloud PROJECT_ID and NUM_WORKERS.
setup.py
defines the packages and requirements for the pipeline to run.
main.py
contains the pipeline code and additional functions used to run the pipeline.
Run the Pipeline
Install the required packages and push the data to Pub/Sub. For this example, you need access to a Google Cloud project, and you need to configure the Google Cloud variables, like PROJECT_ID
, REGION
, PubSub SUBSCRIPTION_ID
, and others in the config.py
file.
- Locally on your machine:
python main.py
- On GCP for Dataflow:
python main.py --mode cloud
The pipeline includes the following steps:
- Read the message from Pub/Sub.
- Convert the Pub/Sub message into a
PCollection
of dictionaries where the key is the UID and the value is the Twitter text. - Encode the text into transformer-readable token ID integers using a tokenizer.
- Use RunInference to get the vector embedding from a transformer-based language model.
- Normalize the embedding.
- Use RunInference to get anomaly prediction from a trained HDBSCAN clustering model.
- Write the prediction to BigQuery so that the clustering model can be retrained when needed.
- Send an email alert if an anomaly is detected.
The following code snippet shows the first two steps of the pipeline:
The next section describes the following pipeline steps:
- Tokenizing the text
- Getting embedding using RunInference
- Getting predictions from the HDBSCAN model
Get Embedding from a Language Model
In order to do clustering with text data, first map the text into vectors of numerical values suitable for statistical analysis. This example uses a transformer-based language model called sentence-transformers/stsb-distilbert-base/stsb-distilbert-base. This model maps sentences and paragraphs to a 768 dimensional dense vector space, and you can use it for tasks like clustering or semantic search.
Because the language model is expecting a tokenized input instead of raw text, start by tokenizing the text. Tokenization is a preprocessing task that transforms text so that it can be fed into the model for getting predictions.
Here, tokenize_sentence
is a function that takes a dictionary with a text and an ID, tokenizes the text, and returns a tuple of the text and ID as well as the tokenized output.
Tokenizer = AutoTokenizer.from_pretrained(cfg.TOKENIZER_NAME)
def tokenize_sentence(input_dict):
"""
Takes a dictionary with a text and an id, tokenizes the text, and
returns a tuple of the text and id and the tokenized text
Args:
input_dict: a dictionary with the text and id of the sentence
Returns:
A tuple of the text and id, and a dictionary of the tokens.
"""
text, uid = input_dict["text"], input_dict["id"]
tokens = Tokenizer([text], padding=True, truncation=True, return_tensors="pt")
tokens = {key: torch.squeeze(val) for key, val in tokens.items()}
return (text, uid), tokens
Tokenized output is then passed to the language model to get the embeddings. To get embeddings from the language model, we use RunInference()
from Apache Beam.
embedding_model_handler
is:We define PytorchNoBatchModelHandler
as a wrapper to PytorchModelHandler
to limit batch size to one.
# Can be removed once: https://github.com/apache/beam/issues/21863 is fixed
class PytorchNoBatchModelHandler(PytorchModelHandlerKeyedTensor):
"""Wrapper to PytorchModelHandler to limit batch size to 1.
The tokenized strings generated from BertTokenizer may have different
lengths, which doesn't work with torch.stack() in current RunInference
implementation since stack() requires tensors to be the same size.
Restricting max_batch_size to 1 means there is only 1 example per `batch`
in the run_inference() call.
"""
def batch_elements_kwargs(self):
return {"max_batch_size": 1}
Because the forward()
for DistilBertModel
doesn’t return the embeddings, we custom define the model_class ModelWrapper
to get the vector embedding.
class ModelWrapper(DistilBertModel):
"""Wrapper to DistilBertModel to get embeddings when calling
forward function."""
def forward(self, **kwargs):
output = super().forward(**kwargs)
sentence_embedding = (
self.mean_pooling(output,
kwargs["attention_mask"]).detach().cpu().numpy())
return sentence_embedding
# Mean Pooling - Take attention mask into account for correct averaging
def mean_pooling(self, model_output, attention_mask):
"""
Calculates the mean of token embeddings
Args:
model_output: The output of the model.
attention_mask: This is a tensor that contains 1s for all input tokens and
0s for all padding tokens.
Returns:
The mean of the token embeddings.
"""
token_embeddings = model_output[
0] # First element of model_output contains all token embeddings
input_mask_expanded = (
attention_mask.unsqueeze(-1).expand(token_embeddings.size()).float())
return torch.sum(token_embeddings * input_mask_expanded, 1) / torch.clamp(
input_mask_expanded.sum(1), min=1e-9)
After getting the embedding for each piece of Twitter text, the embeddings are normalized, because the trained model is expecting normalized embeddings.
Get Predictions
The normalized embeddings are then forwarded to the trained HDBSCAN model to get the predictions.
where clustering_model_handler
is:
We define CustomSklearnModelHandlerNumpy
as a wrapper to SklearnModelHandlerNumpy
to limit batch size to one and to override run_inference
so that hdbscan.approximate_predict()
is used to get anomaly predictions.
class CustomSklearnModelHandlerNumpy(SklearnModelHandlerNumpy):
# limit batch size to 1 can be removed once: https://github.com/apache/beam/issues/21863 is fixed
def batch_elements_kwargs(self):
"""Limit batch size to 1 for inference"""
return {"max_batch_size": 1}
# run_inference can be removed once: https://github.com/apache/beam/issues/22572 is fixed
def run_inference(self, batch, model, inference_args=None):
"""Runs inferences on a batch of numpy arrays.
Args:
batch: A sequence of examples as numpy arrays. They should
be single examples.
model: A numpy model or pipeline. Must implement predict(X).
Where the parameter X is a numpy array.
inference_args: Any additional arguments for an inference.
Returns:
An Iterable of type PredictionResult.
"""
_validate_inference_args(inference_args)
vectorized_batch = np.vstack(batch)
predictions = hdbscan.approximate_predict(model, vectorized_batch)
return [PredictionResult(x, y) for x, y in zip(batch, predictions)]
After getting the model predictions, decode the output from RunInference
into a dictionary. Next, store the prediction in a BigQuery table for analysis, update the HDBSCAN model, and send an email alert if the prediction is an anomaly.
_ = (
predictions
| "Decode Prediction" >> beam.ParDo(DecodePrediction())
| "Write to BQ" >> beam.io.WriteToBigQuery(
table=cfg.TABLE_URI,
schema=cfg.TABLE_SCHEMA,
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
))
_ = predictions | "Alert by Email" >> beam.ParDo(TriggerEmailAlert())
Last updated on 2025/01/19
Have you found everything you were looking for?
Was it all useful and clear? Is there anything that you would like to change? Let us know!