Anomaly Detection Example

The anomaly detection example demonstrates how to set up an anomaly detection pipeline that reads text from Pub/Sub in real time, and then detects anomalies using a trained HDBSCAN clustering model.

Dataset for Anomaly Detection

This example uses a dataset called emotion that contains 20,000 English Twitter messages with 6 basic emotions: anger, fear, joy, love, sadness, and surprise. The dataset has three splits: train (for training), validation, and test (for performance evaluation). Because it contains the text and the category (class) of the dataset, it is a supervised dataset. You can use the Hugging Face datasets page to access this dataset.

The following text shows examples from the train split of the dataset:

TextType of emotion
im grabbing a minute to post i feel greedy wrongAnger
i am ever feeling nostalgic about the fireplace i will know that it is still on the propertyLove
ive been taking or milligrams or times recommended amount and ive fallen asleep a lot faster but i also feel like so funnyFear
on a boat trip to denmarkJoy
i feel you know basically like a fake in the realm of science fictionSadness
i began having them several times a week feeling tortured by the hallucinations moving people and figures sounds and vibrationsFear

Anomaly Detection Algorithm

HDBSCAN is a clustering algorithm that extends DBSCAN by converting it into a hierarchical clustering algorithm and then extracting a flat clustering based in the stability of clusters. When trained, the model predicts -1 if a new data point is an outlier, otherwise it predicts one of the existing clusters.

Ingestion to Pub/Sub

Ingest the data into Pub/Sub so that while clustering, the model can read the tweets from Pub/Sub. Pub/Sub is a messaging service for exchanging event data among applications and services. Streaming analytics and data integration pipelines use Pub/Sub to ingest and distribute data.

You can see the full example code for ingesting data into Pub/Sub in GitHub

The file structure for the ingestion pipeline is shown in the following diagram:

write_data_to_pubsub_pipeline/
├── pipeline/
│   ├── __init__.py
│   ├── options.py
│   └── utils.py
├── __init__.py
├── config.py
├── main.py
└── setup.py

pipeline/utils.py contains the code for loading the emotion dataset and two beam.DoFn that are used for data transformation.

pipeline/options.py contains the pipeline options to configure the Dataflow pipeline.

config.py defines variables that are used multiple times, like Google Cloud PROJECT_ID and NUM_WORKERS.

setup.py defines the packages and requirements for the pipeline to run.

main.py contains the pipeline code and additional functions used for running the pipeline.

Run the Pipeline

To run the pipeline, install the required packages.For this example, you need access to a Google Cloud project, and you need to configure the Google Cloud variables, like PROJECT_ID, REGION, PubSub TOPIC_ID, and others in the config.py file.

  1. Locally on your machine: python main.py
  2. On GCP for Dataflow: python main.py --mode cloud

The write_data_to_pubsub_pipeline contains four different transforms:

  1. Load the emotion dataset using Hugging Face datasets (for simplicity, we take samples from three classes instead of six).
  2. Associate each piece of text with a unique identifier (UID).
  3. Convert the text into the format that Pub/Sub expects.
  4. Write the formatted message to Pub/Sub.

Anomaly Detection on Streaming Data

After ingesting the data to Pub/Sub, run the anomaly detection pipeline. This pipeline reads the streaming message from Pub/Sub, converts the text to an embedding using a language model, and feeds the embedding to an already trained clustering model to predict whether the message is an anomaly. One prerequisite for this pipeline is to have an HDBSCAN clustering model trained on the training split of the dataset.

You can find the full example code for anomaly detection in GitHub

The following diagram shows the file structure for the anomaly_detection pipeline:

anomaly_detection_pipeline/
├── pipeline/
│   ├── __init__.py
│   ├── options.py
│   └── transformations.py
├── __init__.py
├── config.py
├── main.py
└── setup.py

pipeline/transformations.py contains the code for different beam.DoFn and additional functions that are used in pipeline.

pipeline/options.py contains the pipeline options to configure the Dataflow pipeline.

config.py defines variables that are used multiple times, like the Google Cloud PROJECT_ID and NUM_WORKERS.

setup.py defines the packages and requirements for the pipeline to run.

main.py contains the pipeline code and additional functions used to run the pipeline.

Run the Pipeline

Install the required packages and push the data to Pub/Sub. For this example, you need access to a Google Cloud project, and you need to configure the Google Cloud variables, like PROJECT_ID, REGION, PubSub SUBSCRIPTION_ID, and others in the config.py file.

  1. Locally on your machine: python main.py
  2. On GCP for Dataflow: python main.py --mode cloud

The pipeline includes the following steps:

  1. Read the message from Pub/Sub.
  2. Convert the Pub/Sub message into a PCollection of dictionaries where the key is the UID and the value is the Twitter text.
  3. Encode the text into transformer-readable token ID integers using a tokenizer.
  4. Use RunInference to get the vector embedding from a transformer-based language model.
  5. Normalize the embedding.
  6. Use RunInference to get anomaly prediction from a trained HDBSCAN clustering model.
  7. Write the prediction to BigQuery so that the clustering model can be retrained when needed.
  8. Send an email alert if an anomaly is detected.

The following code snippet shows the first two steps of the pipeline:

    docs = (
        pipeline
        | "Read from PubSub"
        >> ReadFromPubSub(subscription=cfg.SUBSCRIPTION_ID, with_attributes=True)
        | "Decode PubSubMessage" >> beam.ParDo(Decode())
    )

The next section describes the following pipeline steps:

Get Embedding from a Language Model

In order to do clustering with text data, first map the text into vectors of numerical values suitable for statistical analysis. This example uses a transformer-based language model called sentence-transformers/stsb-distilbert-base/stsb-distilbert-base. This model maps sentences and paragraphs to a 768 dimensional dense vector space, and you can use it for tasks like clustering or semantic search.

Because the language model is expecting a tokenized input instead of raw text, start by tokenizing the text. Tokenization is a preprocessing task that transforms text so that it can be fed into the model for getting predictions.

    normalized_embedding = (
        docs
        | "Tokenize Text" >> beam.Map(tokenize_sentence)

Here, tokenize_sentence is a function that takes a dictionary with a text and an ID, tokenizes the text, and returns a tuple of the text and ID as well as the tokenized output.

Tokenizer = AutoTokenizer.from_pretrained(cfg.TOKENIZER_NAME)


def tokenize_sentence(input_dict):
  """
    Takes a dictionary with a text and an id, tokenizes the text, and
    returns a tuple of the text and id and the tokenized text

    Args:
      input_dict: a dictionary with the text and id of the sentence

    Returns:
      A tuple of the text and id, and a dictionary of the tokens.
    """
  text, uid = input_dict["text"], input_dict["id"]
  tokens = Tokenizer([text], padding=True, truncation=True, return_tensors="pt")
  tokens = {key: torch.squeeze(val) for key, val in tokens.items()}
  return (text, uid), tokens

Tokenized output is then passed to the language model to get the embeddings. To get embeddings from the language model, we use RunInference() from Apache Beam.

    | "Get Embedding" >> RunInference(KeyedModelHandler(embedding_model_handler))
where embedding_model_handler is:

    embedding_model_handler = PytorchNoBatchModelHandler(
        state_dict_path=cfg.MODEL_STATE_DICT_PATH,
        model_class=ModelWrapper,
        model_params={"config": AutoConfig.from_pretrained(cfg.MODEL_CONFIG_PATH)},
        device="cpu",
    )

We define PytorchNoBatchModelHandler as a wrapper to PytorchModelHandler to limit batch size to one.

# Can be removed once: https://github.com/apache/beam/issues/21863 is fixed
class PytorchNoBatchModelHandler(PytorchModelHandlerKeyedTensor):
  """Wrapper to PytorchModelHandler to limit batch size to 1.
    The tokenized strings generated from BertTokenizer may have different
    lengths, which doesn't work with torch.stack() in current RunInference
    implementation since stack() requires tensors to be the same size.
    Restricting max_batch_size to 1 means there is only 1 example per `batch`
    in the run_inference() call.
    """
  def batch_elements_kwargs(self):
    return {"max_batch_size": 1}

Because the forward() for DistilBertModel doesn’t return the embeddings, we custom define the model_class ModelWrapper to get the vector embedding.

class ModelWrapper(DistilBertModel):
  """Wrapper to DistilBertModel to get embeddings when calling
    forward function."""
  def forward(self, **kwargs):
    output = super().forward(**kwargs)
    sentence_embedding = (
        self.mean_pooling(output,
                          kwargs["attention_mask"]).detach().cpu().numpy())
    return sentence_embedding

  # Mean Pooling - Take attention mask into account for correct averaging
  def mean_pooling(self, model_output, attention_mask):
    """
        Calculates the mean of token embeddings

        Args:
          model_output: The output of the model.
          attention_mask: This is a tensor that contains 1s for all input tokens and
          0s for all padding tokens.

        Returns:
          The mean of the token embeddings.
        """
    token_embeddings = model_output[
        0]  # First element of model_output contains all token embeddings
    input_mask_expanded = (
        attention_mask.unsqueeze(-1).expand(token_embeddings.size()).float())
    return torch.sum(token_embeddings * input_mask_expanded, 1) / torch.clamp(
        input_mask_expanded.sum(1), min=1e-9)

After getting the embedding for each piece of Twitter text, the embeddings are normalized, because the trained model is expecting normalized embeddings.

    | "Normalize Embedding" >> beam.ParDo(NormalizeEmbedding())

Get Predictions

The normalized embeddings are then forwarded to the trained HDBSCAN model to get the predictions.

    predictions = (
        normalized_embedding
        | "Get Prediction from Clustering Model"
        >> RunInference(model_handler=clustering_model_handler)
    )

where clustering_model_handler is:

    clustering_model_handler = KeyedModelHandler(
        CustomSklearnModelHandlerNumpy(
            model_uri=cfg.CLUSTERING_MODEL_PATH, model_file_type=ModelFileType.JOBLIB
        )
    )

We define CustomSklearnModelHandlerNumpy as a wrapper to SklearnModelHandlerNumpy to limit batch size to one and to override run_inference so that hdbscan.approximate_predict() is used to get anomaly predictions.

class CustomSklearnModelHandlerNumpy(SklearnModelHandlerNumpy):
  # limit batch size to 1 can be removed once: https://github.com/apache/beam/issues/21863 is fixed
  def batch_elements_kwargs(self):
    """Limit batch size to 1 for inference"""
    return {"max_batch_size": 1}

  # run_inference can be removed once: https://github.com/apache/beam/issues/22572 is fixed
  def run_inference(self, batch, model, inference_args=None):
    """Runs inferences on a batch of numpy arrays.

        Args:
          batch: A sequence of examples as numpy arrays. They should
            be single examples.
          model: A numpy model or pipeline. Must implement predict(X).
            Where the parameter X is a numpy array.
          inference_args: Any additional arguments for an inference.

        Returns:
          An Iterable of type PredictionResult.
        """
    _validate_inference_args(inference_args)
    vectorized_batch = np.vstack(batch)
    predictions = hdbscan.approximate_predict(model, vectorized_batch)
    return [PredictionResult(x, y) for x, y in zip(batch, predictions)]

After getting the model predictions, decode the output from RunInference into a dictionary. Next, store the prediction in a BigQuery table for analysis, update the HDBSCAN model, and send an email alert if the prediction is an anomaly.

    _ = (
        predictions
        | "Decode Prediction" >> beam.ParDo(DecodePrediction())
        | "Write to BQ" >> beam.io.WriteToBigQuery(
            table=cfg.TABLE_URI,
            schema=cfg.TABLE_SCHEMA,
            write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
            create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
        ))

    _ = predictions | "Alert by Email" >> beam.ParDo(TriggerEmailAlert())