Skip to content

MongoDB Triggers

MongoDB Triggers for Reporting

MongoDB Atlas triggers watch collections for changes and forward the data to a Google Cloud Function via HTTP POST. The Cloud Function then handles writing to BigQuery.

Why a Cloud Function?

The BigQuery API is not accessible from MongoDB Atlas trigger functions. Atlas triggers run in a sandboxed JavaScript environment that only supports outbound HTTP calls via context.http. Since the BigQuery client libraries (Python, Node.js) cannot be used inside Atlas triggers, a Cloud Function acts as a middleware:

  1. MongoDB Trigger detects a change and sends the payload via HTTP POST
  2. Cloud Function receives the payload and uses the BigQuery Python client (google.cloud.bigquery) to insert the data

Prerequisites

  • A MongoDB Atlas cluster with access to the App Services interface
  • A Google Cloud Function (Python) deployed to receive the trigger payloads
  • A BigQuery dataset and table to store the synced data

Creating a Trigger in MongoDB Atlas

  1. Go to MongoDB Atlas > App Services > select your application
  2. In the sidebar, click Triggers
  3. Click Add a Trigger

Trigger Configuration

Field Value
Trigger Type Database
Name Descriptive name, e.g. syncOrdersToBigQuery
Enabled ON
Skip Events on Re-Enable Depending on needs (ON to ignore missed events)
Event Ordering ON if execution order matters
Cluster Name The source cluster
Database Name The source database, e.g. dingoo
Collection Name The collection to watch, e.g. orders
Operation Type Check the operations: Insert, Update, Delete, Replace
Full Document ON (to receive the full document after modification)
Full Document Before Change ON if the document before modification is needed

Trigger Function

In the Function section, write the function that forwards the change event to the Cloud Function.

The trigger sends a standardized payload containing:

  • operation — the type of change (insert, update, replace, delete)
  • document_id — the MongoDB document _id as a string
  • timestamp — ISO timestamp of when the event was processed
  • collection — the source collection name
  • data — the full document as a JSON string (empty "{}" for deletes)
exports = async function (changeEvent) {
  const { operationType, fullDocument, documentKey } = changeEvent;

  const cloudFunctionUrl = "<CLOUD_FUNCTION_URL>";

  // For delete, there is no fullDocument but we have documentKey
  if (operationType === "delete") {
    const row = {
      operation: "delete",
      document_id: documentKey._id.toString(),
      timestamp: new Date().toISOString(),
      collection: changeEvent.ns.coll,
      data: "{}",
    };

    const response = await context.http.post({
      url: cloudFunctionUrl,
      headers: {
        "Content-Type": ["application/json"],
      },
      body: JSON.stringify(row),
    });

    console.log(`Response: ${response.status.code}`);
    console.log(`Body: ${response.body.text()}`);
    return response;
  }

  // For insert/update, we have fullDocument
  if (!fullDocument) {
    console.log("No fullDocument, skipping");
    return;
  }

  const row = {
    operation: operationType,
    document_id: documentKey._id.toString(),
    timestamp: new Date().toISOString(),
    collection: changeEvent.ns.coll,
    data: JSON.stringify(fullDocument),
  };

  const response = await context.http.post({
    url: cloudFunctionUrl,
    headers: {
      "Content-Type": ["application/json"],
    },
    body: JSON.stringify(row),
  });

  console.log(`Response: ${response.status.code}`);
  console.log(`Body: ${response.body.text()}`);

  return response;
};

Securing the Trigger with a Secret

To prevent unauthorized requests to your Cloud Function, use a shared secret: the trigger sends it in a header, and the Cloud Function rejects any request that does not match.

1. Create the Secret in MongoDB Atlas

Reference: MongoDB Atlas — Secret Values

  1. In Atlas, go to the Triggers page for your project
  2. Click the Linked App Service: Triggers link, then in the sidebar click Values under the Build heading
  3. Click Create a Value
  4. Enter a name for the secret (e.g. DEV_TRIGGER_SECRET) — names may only contain ASCII letters, numbers, underscores, and hyphens, and must start with a letter or number
  5. Select type Secret and enter the secret value in the Add Content box (max 500 characters)
  6. Click Save (and Review & Deploy if deployment drafts are enabled)

2. Store the Secret in Google Secret Manager (for Cloud Build)

  1. Go to Google Cloud Console > Secret Manager
  2. Click Create Secret
  3. Name it DEV_TRIGGER_SECRET and paste the same value used in MongoDB Atlas
  4. Click Create
  5. Cloud Build reads this secret and injects it as the TRIGGER_SECRET environment variable when deploying the Cloud Function — no Secret Manager client code is needed at runtime

3. Updated Trigger Function

The trigger reads the value via context.values.get() and sends it in the X-Trigger-Secret header:

exports = async function (changeEvent) {
  const { operationType, fullDocument, documentKey } = changeEvent;

  const cloudFunctionUrl = "<CLOUD_FUNCTION_URL>";
  const secret = context.values.get("DEV_TRIGGER_SECRET");

  const headers = {
    "Content-Type": ["application/json"],
    "X-Trigger-Secret": [secret],
  };

  if (operationType === "delete") {
    const row = {
      operation: "delete",
      document_id: documentKey._id.toString(),
      timestamp: new Date().toISOString(),
      collection: changeEvent.ns.coll,
      data: "{}",
    };

    const response = await context.http.post({
      url: cloudFunctionUrl,
      headers,
      body: JSON.stringify(row),
    });

    console.log(`Response: ${response.status.code}`);
    console.log(`Body: ${response.body.text()}`);
    return response;
  }

  if (!fullDocument) {
    console.log("No fullDocument, skipping");
    return;
  }

  const row = {
    operation: operationType,
    document_id: documentKey._id.toString(),
    timestamp: new Date().toISOString(),
    collection: changeEvent.ns.coll,
    data: JSON.stringify(fullDocument),
  };

  const response = await context.http.post({
    url: cloudFunctionUrl,
    headers,
    body: JSON.stringify(row),
  });

  console.log(`Response: ${response.status.code}`);
  console.log(`Body: ${response.body.text()}`);

  return response;
};

Cloud Function (BigQuery Ingestion)

The Cloud Function receives the payload from the MongoDB trigger and inserts the data into BigQuery using the Python google.cloud.bigquery client.

Dependencies (requirements.txt):

functions-framework
google-cloud-bigquery

Generic function skeleton (main.py):

import functions_framework
from google.cloud import bigquery
import json
import datetime
import os


@functions_framework.http
def main(request):
    try:
        expected_secret = os.environ.get("TRIGGER_SECRET", "")
        if request.headers.get("X-Trigger-Secret", "") != expected_secret:
            return json.dumps({"error": "Unauthorized"}), 401

        data = request.get_json()
        operation = data.get("operation")
        document_id = data.get("document_id")

        print(f"Operation: {operation}, document_id: {document_id}")

        client = bigquery.Client()
        table_id = "<PROJECT_ID>.<DATASET>.<TABLE>"

        # Handle DELETE - mark as archived
        if operation == "delete":
            row = {
                "mongoId": document_id,
                "eventTimestamp": datetime.datetime.utcnow().isoformat() + "Z",
                "deleted": False,
                "archived": True,
            }

            errors = client.insert_rows_json(table_id, [row])

            if errors:
                print(f"BigQuery errors: {errors}")
                return json.dumps({"error": str(errors)}), 500

            return json.dumps({"success": True, "action": "marked_inactive"}), 200

        # Handle INSERT / UPDATE
        doc = json.loads(data.get("data", "{}"))

        # Build the row matching your BigQuery table schema
        row = {
            "mongoId": document_id,
            "eventTimestamp": datetime.datetime.utcnow().isoformat() + "Z",
            "deleted": False,
            # Map your document fields here:
            # "fieldName": doc.get("fieldName"),
            # Use helper functions below for special MongoDB types
        }

        errors = client.insert_rows_json(table_id, [row])

        if errors:
            print(f"BigQuery errors: {errors}")
            return json.dumps({"error": str(errors)}), 500

        return json.dumps({"success": True, "action": "inserted"}), 200

    except Exception as e:
        print(f"Exception: {e}")
        return json.dumps({"error": str(e)}), 500

MongoDB Extended JSON Helpers

MongoDB triggers send documents in Extended JSON format. Use these helper functions to extract values:

def get_id(obj):
    """Extract string ID from {"$oid": "..."} or plain string."""
    if obj is None:
        return None
    if isinstance(obj, str):
        return obj
    if isinstance(obj, dict) and "$oid" in obj:
        return obj.get("$oid")
    return str(obj)

def get_date(obj):
    """Convert {"$date": {"$numberLong": "..."}} to ISO timestamp."""
    if obj is None:
        return None
    if isinstance(obj, str):
        return obj
    if isinstance(obj, dict) and "$date" in obj:
        date_val = obj.get("$date")
        if isinstance(date_val, dict) and "$numberLong" in date_val:
            ts = int(date_val["$numberLong"]) / 1000
            return datetime.datetime.utcfromtimestamp(ts).isoformat() + "Z"
        return date_val
    return None

def get_number(obj):
    """Handle {"$numberDouble": "..."} and {"$numberLong": "..."}."""
    if obj is None:
        return None
    if isinstance(obj, (int, float)):
        return float(obj)
    if isinstance(obj, dict) and "$numberDouble" in obj:
        val = obj["$numberDouble"]
        if val in ["Infinity", "-Infinity", "NaN"]:
            return None
        return float(val)
    if isinstance(obj, dict) and "$numberLong" in obj:
        return int(obj["$numberLong"])
    return None

def get_location(loc):
    """Convert {lat, lng} to BigQuery GEOGRAPHY format POINT(lng lat)."""
    if not loc:
        return None
    lat = get_number(loc.get("lat")) if isinstance(loc.get("lat"), dict) else loc.get("lat")
    lng = get_number(loc.get("lng")) if isinstance(loc.get("lng"), dict) else loc.get("lng")
    if lat is not None and lng is not None:
        return f"POINT({lng} {lat})"
    return None

Usage in the row mapping:

row = {
    "mongoId": document_id,
    "eventTimestamp": datetime.datetime.utcnow().isoformat() + "Z",
    "someReference": get_id(doc.get("someReference")),      # ObjectId field
    "createdAt": get_date(doc.get("createdAt")),             # Date field
    "amount": get_number(doc.get("amount")),                 # Number field
    "location": get_location(doc.get("location")),           # {lat, lng} field
    "name": doc.get("name"),                                 # Plain string
}

Adding a Trigger for a New Collection

To sync an additional collection, create a new trigger following the same pattern:

  1. Add a Trigger in App Services
  2. Set the Collection Name to the new collection
  3. Use the same function template above — the collection field in the payload is automatically set from changeEvent.ns.coll
  4. Update the Cloud Function to handle the new collection's data

Best Practices

  • Full Document: always enable it so that insert/update events contain the complete document
  • Null check: always guard against missing fullDocument before processing — it can be null in edge cases
  • Secret: protect the Cloud Function URL with a shared secret — store it in MongoDB Atlas Values and Google Secret Manager, and reject requests with a missing or wrong X-Trigger-Secret header
  • Logging: use console.log() to log response status and body (visible in App Services > Logs)
  • Monitoring: check trigger logs in App Services > Logs to diagnose failures