, ,

Building a Scalable Data Ingestion API with Python

Building a Scalable Data Ingestion

In today’s era of monitoring and predictive analysis, having a flexible and scalable data ingestion system is crucial. In this post, we’ll guide you through creating a simple but powerful Scalable API using Python and Flask that can ingest custom metrics into an InfluxDB instance. This API is designed to accept any metric you define, store it with precise timestamps, and allow easy querying later.

This hands-on project will provide you with the building blocks to expand and customize your own data pipeline.

Step 1: Prerequisites

Before diving into the code, make sure you have:

  1. InfluxDB (Version 2.x) installed and running.
  2. Python 3.6+ installed on your system.
  3. A bucket created in InfluxDB for storing the metrics (e.g., snmp_monitoring).

Read our blog about Storing Network Monitoring Data in InfluxDB to setup your influx credentials.

Install the required Python libraries:

Copied!
pip install flask influxdb-client

Step 2: Setting Up InfluxDB

You need to create a token in InfluxDB with write permissions for the bucket you’ll use. Once you have the token and bucket ready, note the following:

  • InfluxDB URL (default: http://localhost:8086)
  • Organization name
  • Bucket name

Step 3: Building the Python API

Here’s the full Python code for the API:

Copied!
from flask import Flask, request, jsonify from influxdb_client import InfluxDBClient, Point, WritePrecision import time import logging # Configure logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) # InfluxDB connection details INFLUXDB_URL = 'http://localhost:8086' INFLUXDB_TOKEN = 'your_influxdb_token' INFLUXDB_ORG = 'your_organization' INFLUXDB_BUCKET = 'snmp_monitoring' # Initialize Flask app app = Flask(__name__) # Initialize InfluxDB client client = InfluxDBClient(url=INFLUXDB_URL, token=INFLUXDB_TOKEN, org=INFLUXDB_ORG) write_api = client.write_api() @app.route('/api/submit', methods=['POST']) def submit_data(): logger.info("Incoming request: %s", request.json) # Haal de parameters op field = request.json.get('field') value = request.json.get('value') timestamp = request.json.get('timestamp', int(time.time() * 1_000_000_000)) if not field or value is None: logger.warning("Missing field or value in request") return jsonify({"error": "Field and value are required"}), 400 try: value = float(value) except ValueError: logger.warning("Invalid value format: %s", value) return jsonify({"error": "Value must be a number"}), 400 # Hostname/IP hostname = request.remote_addr # Use current timestamp if none provided try: timestamp = int(timestamp) except ValueError: logger.warning("Invalid timestamp format: %s", timestamp) return jsonify({"error": "Invalid timestamp format"}), 400 # Get extra tags (other fields than 'field', 'value' or 'timestamp') extra_tags = {k: v for k, v in request.json.items() if k not in ['field', 'value', 'timestamp']} extra_tags['hostname'] = hostname # Always add hostname # Create an InfluxDB Point point = ( Point(field) .field("value", value) .time(timestamp, WritePrecision.NS) ) # Add extra tags to point for tag_key, tag_value in extra_tags.items(): point.tag(tag_key, str(tag_value)) # Write to InfluxDB try: write_api.write(bucket=INFLUXDB_BUCKET, org=INFLUXDB_ORG, record=point) logger.info("Data written to InfluxDB: %s", point) return jsonify({"status": "success", "field": field, "value": value, "extra_tags": extra_tags}), 200 except Exception as e: logger.error("Failed to write to InfluxDB: %s", str(e)) return jsonify({"error": "Failed to write to database", "details": str(e)}), 500 if __name__ == '__main__': app.run(host='0.0.0.0', port=5000)

Save this code as app.py.

This API will accept a JSON array of fields. ‘field’ and ‘value’ are required because there is little sense in storing no values. All other field are tags you can use to identify the measurement more closely.

Step 4: Running the API

Start your Flask application:

Copied!
python3 app.py

The API will run on http://0.0.0.0:5000.

Running an API like this is fine while you’re still testing, but it’s definitely not a production solution.

Read our blog about using Gunicorn as a WSGI server. And while you’re browsing, also check out our blog about Adding A Security Layer to your API.

Step 5: Submitting Metrics

Use curl to test the API:

Copied!
curl -X POST http://127.0.0.1:5000/api/submit -H "Content-Type: application/json" -d '{ "field": "os_updates", "value": 73, "hostname": "server01", "region": "EU-West", "environment": "production", "application": "backend" }'

Expected response:

Copied!
{ "extra_tags": { "application": "backend", "environment": "production", "hostname": "127.0.0.1", "region": "EU-West" }, "field": "os_updates", "status": "success", "value": 73.0 }

Step 6: Querying Metrics in InfluxDB

To query your metrics, use the Influx CLI or the web UI:

Copied!
influx query ' from(bucket: "snmp_monitoring") |> range(start: -1d) |> filter(fn: (r) => r._measurement == "os_updates") |> limit(n: 10) '

You can also visualize this data in tools like Grafana.

Example: Monitoring Server Updates with the API

Keeping track of available server updates is essential to maintaining the health and security of your infrastructure. Using our newly developed API, we can efficiently log the number of pending updates into our InfluxDB.

Here’s a simple script you can schedule as a cron job to automate this process:

Copied!
#!/bin/bash # Count the available updates, filtering out unnecessary output UPDATE_COUNT=$(/usr/bin/dnf check-update 2>/dev/null | grep -vE "^(Dependencies resolved|Nothing to do|Complete|Security|Last meta)" | wc -l) # Send the update count to the API curl -X POST http://127.0.0.1:5000/api/submit \ -H "Content-Type: application/json" \ -d "{\"field\": \"os_updates\", \"value\": $UPDATE_COUNT}"

In this script:

  1. dnf check-update: Retrieves a list of available updates. Any unnecessary output (such as “Dependencies resolved” or “Nothing to do”) is filtered out using grep.
  2. wc -l: Counts the number of remaining lines to determine how many updates are available.
  3. curl: Sends this data to our API, storing it in the InfluxDB under the os_updates field.

Viewing the Data in InfluxDB

Once the script runs and data is stored, you can query InfluxDB to view the collected information. For example, to see the data from the past 24 hours:

Copied!
influx query ' from(bucket: "snmp_monitoring") |> range(start: -24h) |> filter(fn: (r) => r._measurement == "os_updates")'

This query will return the timestamps and update counts, giving you insight into when updates were available and their volume.

Conclusion

With this simple API, you’ve built a scalable system to ingest metrics into InfluxDB. Having a flexible and scalable data ingestion system is crucial.. The design is flexible, allowing you to add custom metrics from any device or service. Whether for SNMP monitoring, application telemetry, or any other use case, this API lays the groundwork for advanced data collection.