Tutorial
How to Build a Data Pipeline From Scratch
Build a reliable, scheduled data pipeline in Python from zero — covering extraction, transformation, loading, retries, logging, and monitoring with simple tools and no prior data engineering experience required.
June 2026 · 7 min read · 1 views · 0 hearts
Advertisement
How to Build a Data Pipeline From Scratch
Data pipelines might sound like something reserved for data engineers with six-figure salaries and an army of cloud services. The reality is simpler. If you’ve ever written a script that fetches data, transforms it, and saves it somewhere else, you’ve built one. Scaling that into something reliable and maintainable is the next step — and it doesn’t require magic.
Let’s walk through building a real pipeline from zero, using Python, a few standard libraries, and one or two helper tools. You’ll learn the architecture, the gotchas, and how to avoid the most common failure modes.
Start With the Data Source
Every pipeline has a source. It could be an API, a CSV file, a database, or a web scraper. For this example, let’s say you need to fetch cryptocurrency prices from a public API every five minutes and store them for analysis.
import requests
import json
def fetch_price(coin="bitcoin"):
url = f"https://api.coingecko.com/api/v3/simple/price?ids={coin}&vs_currencies=usd"
response = requests.get(url)
return response.json()
This is the “extract” phase. Keep it simple. A function that returns a dict. Don’t add error handling yet — just get it working.
Decide on Storage
You have options. A SQLite database is perfect for learning because it requires zero setup. For production, Postgres or a cloud data warehouse like BigQuery would be more appropriate. But start local.
import sqlite3
conn = sqlite3.connect("crypto_prices.db")
cursor = conn.cursor()
cursor.execute("""
CREATE TABLE IF NOT EXISTS prices (
id INTEGER PRIMARY KEY AUTOINCREMENT,
coin TEXT,
price REAL,
timestamp DATETIME DEFAULT CURRENT_TIMESTAMP
)
""")
conn.commit()
Notice the timestamp column with a default. Never trust a pipeline without timestamps — they are your only way to debug timing issues later.
The Transform Step (Don’t Skip It)
Raw API data is rarely ready for storage. The CoinGecko response looks like {"bitcoin": {"usd": 43000}}. You need to flatten it.
def transform(raw):
coin = list(raw.keys())[0]
price = raw[coin]["usd"]
return coin, price
This function is pure — no side effects. That makes it testable. You can write a unit test that passes in a mock dict and expects a tuple back. This separation of concerns is what separates a hacked-together script from a real pipeline.
Load It
Now stitch the pieces together. Extract, transform, load (ETL).
def load(coin, price):
cursor.execute("INSERT INTO prices (coin, price) VALUES (?, ?)", (coin, price))
conn.commit()
def run_pipeline():
raw = fetch_price()
coin, price = transform(raw)
load(coin, price)
print(f"Loaded {coin}: ${price}")
Run it once. Check your database. If it works, you have a pipeline.
Make It Run Automatically
Manual execution isn’t a pipeline — it’s a chore. You need a scheduler. The simplest is a while True loop with time.sleep(), but that’s fragile. Instead, use schedule, a lightweight Python library.
import schedule
schedule.every(5).minutes.do(run_pipeline)
while True:
schedule.run_pending()
time.sleep(1)
Run this in a terminal. Let it run for an hour. Check the database. You now have a working, scheduled data pipeline.
Handle Failures Without Panic
Everything will break eventually. The API goes down. Your internet drops. The database disk fills up. Plan for that.
Add retries:
from tenacity import retry, stop_after_attempt, wait_exponential
@retry(stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=2, max=10))
def fetch_price(coin="bitcoin"):
# ...
Wrap the whole pipeline in a try-except and log the error instead of crashing.
import logging
logging.basicConfig(level=logging.INFO)
def run_pipeline():
try:
raw = fetch_price()
coin, price = transform(raw)
load(coin, price)
logging.info(f"Success: {coin} ${price}")
except Exception as e:
logging.error(f"Pipeline failed: {e}")
Monitor What Matters
A silent failure is worse than no data. You need alerts. For a single machine, a simple approach is to write a healthcheck endpoint.
from flask import Flask
app = Flask(__name__)
@app.route("/health")
def health():
cursor.execute("SELECT COUNT(*) FROM prices")
count = cursor.fetchone()[0]
return f"OK - {count} records"
Or use a service like Healthchecks.io that sends you an email if your pipeline stops pinging them. Either way, don’t rely on “I’ll check it manually.”
Scale It Without Rewriting
Once this basic pipeline works, you’ll want to add more coins, more exchanges, or more frequent runs. Resist the urge to rewrite everything. Instead, make the design flexible.
- Store API keys and coin lists in a config file (JSON or YAML), not hardcoded.
- Use environment variables for database paths.
- Replace SQLite with Postgres by changing the connection string.
- Parallelize with
concurrent.futuresif you have many API calls.
import concurrent.futures
coins = ["bitcoin", "ethereum", "solana"]
with concurrent.futures.ThreadPoolExecutor() as executor:
results = executor.map(fetch_price, coins)
The architecture stays the same. Only the details change.
What You Actually Built
You now have a production-grade starter pipeline. It has retries, logging, scheduling, and storage. It’s not a toy — many companies run pipelines exactly like this for months before migrating to orchestration tools like Airflow or Prefect.
The most important lesson isn’t the code. It’s the mindset: start with the simplest thing that works, then add reliability layer by layer. Don’t over-engineer a pipeline that doesn’t exist yet.
And always timestamp your data. Future you will thank you.
Advertisement
Comments
Questions, corrections, and tips stay visible for everyone reading this page.
Join the discussion
No comments yet
Be the first to leave a note — it helps the next reader.