Skip to content

MongoDB Triggers

MongoDB Triggers for Reporting

MongoDB Atlas triggers watch collections for changes and forward the data to a Google Cloud Function via HTTP POST. The Cloud Function then handles writing to BigQuery.

Why a Cloud Function?

The BigQuery API is not accessible from MongoDB Atlas trigger functions. Atlas triggers run in a sandboxed JavaScript environment that only supports outbound HTTP calls via context.http. Since the BigQuery client libraries (Python, Node.js) cannot be used inside Atlas triggers, a Cloud Function acts as a middleware:

  1. MongoDB Trigger detects a change and sends the payload via HTTP POST
  2. Cloud Function receives the payload and uses the BigQuery Python client (google.cloud.bigquery) to insert the data

Prerequisites

  • A MongoDB Atlas cluster with access to the App Services interface
  • A Google Cloud Function (Python) deployed to receive the trigger payloads
  • A BigQuery dataset and table to store the synced data

Creating a Trigger in MongoDB Atlas

  1. Go to MongoDB Atlas > App Services > select your application
  2. In the sidebar, click Triggers
  3. Click Add a Trigger

Trigger Configuration

Field Value
Trigger Type Database
Name Descriptive name, e.g. syncOrdersToBigQuery
Enabled ON
Skip Events on Re-Enable Depending on needs (ON to ignore missed events)
Event Ordering ON if execution order matters
Cluster Name The source cluster
Database Name The source database, e.g. dingoo
Collection Name The collection to watch, e.g. orders
Operation Type Check the operations: Insert, Update, Delete, Replace
Full Document ON (to receive the full document after modification)
Full Document Before Change ON if the document before modification is needed

Trigger Function

In the Function section, write the function that forwards the change event to the Cloud Function.

The trigger sends a standardized payload containing: - operation — the type of change (insert, update, replace, delete) - document_id — the MongoDB document _id as a string - timestamp — ISO timestamp of when the event was processed - collection — the source collection name - data — the full document as a JSON string (empty "{}" for deletes)

exports = async function (changeEvent) {
  const { operationType, fullDocument, documentKey } = changeEvent;

  const cloudFunctionUrl = "<CLOUD_FUNCTION_URL>";

  // For delete, there is no fullDocument but we have documentKey
  if (operationType === "delete") {
    const row = {
      operation: "delete",
      document_id: documentKey._id.toString(),
      timestamp: new Date().toISOString(),
      collection: changeEvent.ns.coll,
      data: "{}",
    };

    const response = await context.http.post({
      url: cloudFunctionUrl,
      headers: {
        "Content-Type": ["application/json"],
      },
      body: JSON.stringify(row),
    });

    console.log(`Response: ${response.status.code}`);
    console.log(`Body: ${response.body.text()}`);
    return response;
  }

  // For insert/update, we have fullDocument
  if (!fullDocument) {
    console.log("No fullDocument, skipping");
    return;
  }

  const row = {
    operation: operationType,
    document_id: documentKey._id.toString(),
    timestamp: new Date().toISOString(),
    collection: changeEvent.ns.coll,
    data: JSON.stringify(fullDocument),
  };

  const response = await context.http.post({
    url: cloudFunctionUrl,
    headers: {
      "Content-Type": ["application/json"],
    },
    body: JSON.stringify(row),
  });

  console.log(`Response: ${response.status.code}`);
  console.log(`Body: ${response.body.text()}`);

  return response;
};

Cloud Function (BigQuery Ingestion)

The Cloud Function receives the payload from the MongoDB trigger and inserts the data into BigQuery using the Python google.cloud.bigquery client.

Dependencies (requirements.txt):

functions-framework
google-cloud-bigquery

Generic function skeleton (main.py):

import functions_framework
from google.cloud import bigquery
import json
import datetime


@functions_framework.http
def main(request):
    try:
        data = request.get_json()
        operation = data.get("operation")
        document_id = data.get("document_id")

        print(f"Operation: {operation}, document_id: {document_id}")

        client = bigquery.Client()
        table_id = "<PROJECT_ID>.<DATASET>.<TABLE>"

        # Handle DELETE - mark as archived
        if operation == "delete":
            row = {
                "mongoId": document_id,
                "sync_timestamp": datetime.datetime.utcnow().isoformat() + "Z",
                "deleted": False,
                "archived": True,
            }

            errors = client.insert_rows_json(table_id, [row])

            if errors:
                print(f"BigQuery errors: {errors}")
                return json.dumps({"error": str(errors)}), 500

            return json.dumps({"success": True, "action": "marked_inactive"}), 200

        # Handle INSERT / UPDATE
        doc = json.loads(data.get("data", "{}"))

        # Build the row matching your BigQuery table schema
        row = {
            "mongoId": document_id,
            "sync_timestamp": datetime.datetime.utcnow().isoformat() + "Z",
            "deleted": False,
            # Map your document fields here:
            # "fieldName": doc.get("fieldName"),
            # Use helper functions below for special MongoDB types
        }

        errors = client.insert_rows_json(table_id, [row])

        if errors:
            print(f"BigQuery errors: {errors}")
            return json.dumps({"error": str(errors)}), 500

        return json.dumps({"success": True, "action": "inserted"}), 200

    except Exception as e:
        print(f"Exception: {e}")
        return json.dumps({"error": str(e)}), 500

MongoDB Extended JSON Helpers

MongoDB triggers send documents in Extended JSON format. Use these helper functions to extract values:

def get_id(obj):
    """Extract string ID from {"$oid": "..."} or plain string."""
    if obj is None:
        return None
    if isinstance(obj, str):
        return obj
    if isinstance(obj, dict) and "$oid" in obj:
        return obj.get("$oid")
    return str(obj)

def get_date(obj):
    """Convert {"$date": {"$numberLong": "..."}} to ISO timestamp."""
    if obj is None:
        return None
    if isinstance(obj, str):
        return obj
    if isinstance(obj, dict) and "$date" in obj:
        date_val = obj.get("$date")
        if isinstance(date_val, dict) and "$numberLong" in date_val:
            ts = int(date_val["$numberLong"]) / 1000
            return datetime.datetime.utcfromtimestamp(ts).isoformat() + "Z"
        return date_val
    return None

def get_number(obj):
    """Handle {"$numberDouble": "..."} and {"$numberLong": "..."}."""
    if obj is None:
        return None
    if isinstance(obj, (int, float)):
        return float(obj)
    if isinstance(obj, dict) and "$numberDouble" in obj:
        val = obj["$numberDouble"]
        if val in ["Infinity", "-Infinity", "NaN"]:
            return None
        return float(val)
    if isinstance(obj, dict) and "$numberLong" in obj:
        return int(obj["$numberLong"])
    return None

def get_location(loc):
    """Convert {lat, lng} to BigQuery GEOGRAPHY format POINT(lng lat)."""
    if not loc:
        return None
    lat = get_number(loc.get("lat")) if isinstance(loc.get("lat"), dict) else loc.get("lat")
    lng = get_number(loc.get("lng")) if isinstance(loc.get("lng"), dict) else loc.get("lng")
    if lat is not None and lng is not None:
        return f"POINT({lng} {lat})"
    return None

Usage in the row mapping:

row = {
    "mongoId": document_id,
    "sync_timestamp": datetime.datetime.utcnow().isoformat() + "Z",
    "someReference": get_id(doc.get("someReference")),      # ObjectId field
    "createdAt": get_date(doc.get("createdAt")),             # Date field
    "amount": get_number(doc.get("amount")),                 # Number field
    "location": get_location(doc.get("location")),           # {lat, lng} field
    "name": doc.get("name"),                                 # Plain string
}

Adding a Trigger for a New Collection

To sync an additional collection, create a new trigger following the same pattern:

  1. Add a Trigger in App Services
  2. Set the Collection Name to the new collection
  3. Use the same function template above — the collection field in the payload is automatically set from changeEvent.ns.coll
  4. Update the Cloud Function to handle the new collection's data

Best Practices

  • Full Document: always enable it so that insert/update events contain the complete document
  • Null check: always guard against missing fullDocument before processing — it can be null in edge cases
  • Logging: use console.log() to log response status and body (visible in App Services > Logs)
  • Monitoring: check trigger logs in App Services > Logs to diagnose failures