MongoDB Triggers
MongoDB Triggers for Reporting
MongoDB Atlas triggers watch collections for changes and forward the data to a Google Cloud Function via HTTP POST. The Cloud Function then handles writing to BigQuery.
Why a Cloud Function?
The BigQuery API is not accessible from MongoDB Atlas trigger functions. Atlas triggers run in a sandboxed JavaScript environment that only supports outbound HTTP calls via context.http. Since the BigQuery client libraries (Python, Node.js) cannot be used inside Atlas triggers, a Cloud Function acts as a middleware:
- MongoDB Trigger detects a change and sends the payload via HTTP POST
- Cloud Function receives the payload and uses the BigQuery Python client (
google.cloud.bigquery) to insert the data
Prerequisites
- A MongoDB Atlas cluster with access to the App Services interface
- A Google Cloud Function (Python) deployed to receive the trigger payloads
- A BigQuery dataset and table to store the synced data
Creating a Trigger in MongoDB Atlas
- Go to MongoDB Atlas > App Services > select your application
- In the sidebar, click Triggers
- Click Add a Trigger
Trigger Configuration
| Field | Value |
|---|---|
| Trigger Type | Database |
| Name | Descriptive name, e.g. syncOrdersToBigQuery |
| Enabled | ON |
| Skip Events on Re-Enable | Depending on needs (ON to ignore missed events) |
| Event Ordering | ON if execution order matters |
| Cluster Name | The source cluster |
| Database Name | The source database, e.g. dingoo |
| Collection Name | The collection to watch, e.g. orders |
| Operation Type | Check the operations: Insert, Update, Delete, Replace |
| Full Document | ON (to receive the full document after modification) |
| Full Document Before Change | ON if the document before modification is needed |
Trigger Function
In the Function section, write the function that forwards the change event to the Cloud Function.
The trigger sends a standardized payload containing:
- operation — the type of change (insert, update, replace, delete)
- document_id — the MongoDB document _id as a string
- timestamp — ISO timestamp of when the event was processed
- collection — the source collection name
- data — the full document as a JSON string (empty "{}" for deletes)
exports = async function (changeEvent) {
const { operationType, fullDocument, documentKey } = changeEvent;
const cloudFunctionUrl = "<CLOUD_FUNCTION_URL>";
// For delete, there is no fullDocument but we have documentKey
if (operationType === "delete") {
const row = {
operation: "delete",
document_id: documentKey._id.toString(),
timestamp: new Date().toISOString(),
collection: changeEvent.ns.coll,
data: "{}",
};
const response = await context.http.post({
url: cloudFunctionUrl,
headers: {
"Content-Type": ["application/json"],
},
body: JSON.stringify(row),
});
console.log(`Response: ${response.status.code}`);
console.log(`Body: ${response.body.text()}`);
return response;
}
// For insert/update, we have fullDocument
if (!fullDocument) {
console.log("No fullDocument, skipping");
return;
}
const row = {
operation: operationType,
document_id: documentKey._id.toString(),
timestamp: new Date().toISOString(),
collection: changeEvent.ns.coll,
data: JSON.stringify(fullDocument),
};
const response = await context.http.post({
url: cloudFunctionUrl,
headers: {
"Content-Type": ["application/json"],
},
body: JSON.stringify(row),
});
console.log(`Response: ${response.status.code}`);
console.log(`Body: ${response.body.text()}`);
return response;
};
Cloud Function (BigQuery Ingestion)
The Cloud Function receives the payload from the MongoDB trigger and inserts the data into BigQuery using the Python google.cloud.bigquery client.
Dependencies (requirements.txt):
Generic function skeleton (main.py):
import functions_framework
from google.cloud import bigquery
import json
import datetime
@functions_framework.http
def main(request):
try:
data = request.get_json()
operation = data.get("operation")
document_id = data.get("document_id")
print(f"Operation: {operation}, document_id: {document_id}")
client = bigquery.Client()
table_id = "<PROJECT_ID>.<DATASET>.<TABLE>"
# Handle DELETE - mark as archived
if operation == "delete":
row = {
"mongoId": document_id,
"sync_timestamp": datetime.datetime.utcnow().isoformat() + "Z",
"deleted": False,
"archived": True,
}
errors = client.insert_rows_json(table_id, [row])
if errors:
print(f"BigQuery errors: {errors}")
return json.dumps({"error": str(errors)}), 500
return json.dumps({"success": True, "action": "marked_inactive"}), 200
# Handle INSERT / UPDATE
doc = json.loads(data.get("data", "{}"))
# Build the row matching your BigQuery table schema
row = {
"mongoId": document_id,
"sync_timestamp": datetime.datetime.utcnow().isoformat() + "Z",
"deleted": False,
# Map your document fields here:
# "fieldName": doc.get("fieldName"),
# Use helper functions below for special MongoDB types
}
errors = client.insert_rows_json(table_id, [row])
if errors:
print(f"BigQuery errors: {errors}")
return json.dumps({"error": str(errors)}), 500
return json.dumps({"success": True, "action": "inserted"}), 200
except Exception as e:
print(f"Exception: {e}")
return json.dumps({"error": str(e)}), 500
MongoDB Extended JSON Helpers
MongoDB triggers send documents in Extended JSON format. Use these helper functions to extract values:
def get_id(obj):
"""Extract string ID from {"$oid": "..."} or plain string."""
if obj is None:
return None
if isinstance(obj, str):
return obj
if isinstance(obj, dict) and "$oid" in obj:
return obj.get("$oid")
return str(obj)
def get_date(obj):
"""Convert {"$date": {"$numberLong": "..."}} to ISO timestamp."""
if obj is None:
return None
if isinstance(obj, str):
return obj
if isinstance(obj, dict) and "$date" in obj:
date_val = obj.get("$date")
if isinstance(date_val, dict) and "$numberLong" in date_val:
ts = int(date_val["$numberLong"]) / 1000
return datetime.datetime.utcfromtimestamp(ts).isoformat() + "Z"
return date_val
return None
def get_number(obj):
"""Handle {"$numberDouble": "..."} and {"$numberLong": "..."}."""
if obj is None:
return None
if isinstance(obj, (int, float)):
return float(obj)
if isinstance(obj, dict) and "$numberDouble" in obj:
val = obj["$numberDouble"]
if val in ["Infinity", "-Infinity", "NaN"]:
return None
return float(val)
if isinstance(obj, dict) and "$numberLong" in obj:
return int(obj["$numberLong"])
return None
def get_location(loc):
"""Convert {lat, lng} to BigQuery GEOGRAPHY format POINT(lng lat)."""
if not loc:
return None
lat = get_number(loc.get("lat")) if isinstance(loc.get("lat"), dict) else loc.get("lat")
lng = get_number(loc.get("lng")) if isinstance(loc.get("lng"), dict) else loc.get("lng")
if lat is not None and lng is not None:
return f"POINT({lng} {lat})"
return None
Usage in the row mapping:
row = {
"mongoId": document_id,
"sync_timestamp": datetime.datetime.utcnow().isoformat() + "Z",
"someReference": get_id(doc.get("someReference")), # ObjectId field
"createdAt": get_date(doc.get("createdAt")), # Date field
"amount": get_number(doc.get("amount")), # Number field
"location": get_location(doc.get("location")), # {lat, lng} field
"name": doc.get("name"), # Plain string
}
Adding a Trigger for a New Collection
To sync an additional collection, create a new trigger following the same pattern:
- Add a Trigger in App Services
- Set the Collection Name to the new collection
- Use the same function template above — the
collectionfield in the payload is automatically set fromchangeEvent.ns.coll - Update the Cloud Function to handle the new collection's data
Best Practices
- Full Document: always enable it so that insert/update events contain the complete document
- Null check: always guard against missing
fullDocumentbefore processing — it can benullin edge cases - Logging: use
console.log()to log response status and body (visible in App Services > Logs) - Monitoring: check trigger logs in App Services > Logs to diagnose failures