top of page
Search

Build a simple ETL Pipeline: FastAPI → JSON → CSV → SFTP (with Docker Compose)

  • Writer: Mohammed  Juyel Haque
    Mohammed Juyel Haque
  • Aug 10
  • 4 min read

A step-by-step blog post showing how to build a small ETL pipeline where a FastAPI endpoint executes SQL (from .sql files), saves the result as JSON, converts that JSON into CSV and uploads the CSV to an SFTP server running locally via Docker Compose.

This is perfect for demos, small automation tasks or testing SFTP delivery in CI pipelines.

Goals

  • Provide a FastAPI-based mock API that runs SQL queries stored in files and returns JSON.

  • Save API results into a json_output/ folder.

  • Convert each JSON to CSV into csv_output/.

  • Upload CSVs to a local SFTP server (using atmoz/sftp in Docker Compose).

  • Provide runnable code, .env example, and instructions.

ree

Directory Structure:

project-root/
├─ api/
│  ├─ __init__.py
│  └─ query_api.py
├─ logic/
│  ├─ __init__.py
│  └─ csv_manager.py
├─ csv/
│  ├─ __init__.py
│  └─ csv_generator.py
├─ fast_api/
│  ├─ __init__.py
│  └─ mockapi.py
├─ docker/
│  └─ docker-compose.yml
├─ sftp/
│  ├─ __init__.py
│  └─ sftp_uploader.py
├─ json_output/
├─ csv_output/
├─ sftp_uploader/
├─ sql/
│  ├─ query1.sql
│  └─ query2.sql
├─ .env
├─ requirements.txt
├─ app.py

Prerequisites

  • Python 3.9+ (3.11 recommended)

  • Docker & Docker Compose

  • pip to install Python dependencies


Install Python deps:

pip install -r requirements.txt

requirements.txt minimal:

fastapi
uvicorn
python-dotenv
paramiko

Environment file (.env)


TEST_API=http://127.0.0.1:8000/dummy-query
BEARER_TOKEN=dummy_token
SQL_DIR=./sql
JSON_OUTPUT_DIR=./json_output
CSV_OUTPUT_DIR=./csv_output
SFTP_HOST=127.0.0.1
SFTP_PORT=2222
SFTP_USER=testuser
SFTP_PASSWORD=testpass
SFTP_TARGET_DIR=/upload

app.py — main entry point

from logic import csv_manager as cs

def main():
    cs.generate_all_csv()

if __name__ == "__main__":
    main()

Run with:

python app.py

mock_api.py — FastAPI endpoint to run a named SQL file

# mock_api.py
from fastapi import FastAPI, Request
from pydantic import BaseModel
from fastapi.responses import JSONResponse
import uvicorn

app = FastAPI()

class QueryRequest(BaseModel):
    query: str

@app.post("/dummy-query")
async def handle_query(req: QueryRequest):
    # Dummy data you might get from a database
    fake_data = [
        {"id": 1, "name": "Alice", "email": "alice@example.com"},
        {"id": 2, "name": "Bob", "email": "bob@example.com"},
    ]
    return JSONResponse(content={"results": fake_data})

if __name__ == "__main__":
    uvicorn.run(app, host="0.0.0.0", port=8000)

Run with:

python mockapi.py

query_api.py — helper that runs SQL and writes JSON

import os
import requests
import json
from dotenv import load_dotenv

load_dotenv()

DBSR_API = os.getenv("DBSR_API")
BEARER_TOKEN = os.getenv("BEARER_TOKEN")
HEADER = {
    "Authorization": f"Bearer {BEARER_TOKEN}",
    "Content-Type": "application/json"
}

def read_sql_file(file_path):
    with open(file_path, 'r') as file:
        return file.read()

def fetch_data_from_sql_file(sql_file_path, json_filename):
    query = read_sql_file(sql_file_path)
    return fetch_data_and_save(query, json_filename)

def fetch_data_and_save(query, json_filename):
    payload = { "query": query }
    try:
        response = requests.post(DBSR_API, headers=HEADER, json=payload)
        response.raise_for_status()
        data = response.json().get("results", [])
        
        json_output_dir = os.getenv("JSON_OUTPUT_DIR")
        os.makedirs(json_output_dir, exist_ok=True)
        json_file_path = os.path.join(json_output_dir, f"{json_filename}.json")

        with open(json_file_path, 'w', encoding='utf-8') as f:
            json.dump(data, f, indent=4)

        print(f"Saved JSON data to {json_file_path}")
        return json_file_path

    except requests.exceptions.RequestException as e:
        print(f"Error fetching data: {e}")
        return None

csv_generator.py — JSON → CSV


import csv
import json
import os

def generate_csv_from_json(json_file_path, csv_filename):
    if not os.path.exists(json_file_path):
        print(f"❌ JSON file not found: {json_file_path}")
        return None

    with open(json_file_path, 'r', encoding='utf-8') as f:
        data = json.load(f)

    # Must be a non-empty list of dicts
    if not isinstance(data, list) or not all(isinstance(row, dict) for row in data):
        print(f"❌ Invalid JSON format in {json_file_path}. Expected a list of dictionaries.")
        return None

    if not data:
        print(f"⚠ No data to write for {csv_filename}")
        return None

    keys = list(data[0].keys())  # Use first row's keys
    csv_output_dir = os.getenv("CSV_OUTPUT_DIR")
    os.makedirs(csv_output_dir, exist_ok=True)
    csv_file_path = os.path.join(csv_output_dir, f"{csv_filename}.csv")

    with open(csv_file_path, 'w', newline='', encoding='utf-8') as csvfile:
        writer = csv.DictWriter(csvfile, fieldnames=keys)
        writer.writeheader()
        writer.writerows(data)

    print(f"✅ CSV file {csv_file_path} generated successfully.")
    return csv_file_path

sftp_uploader.py — upload CSVs to SFTP

import os
import posixpath
import paramiko
from dotenv import load_dotenv

load_dotenv()


def upload_to_sftp(local_folder):
    """Upload all files from a local folder to the SFTP target directory."""
    host = os.getenv("SFTP_HOST")
    port = int(os.getenv("SFTP_PORT", 22))
    username = os.getenv("SFTP_USER")
    password = os.getenv("SFTP_PASSWORD")
    target_dir = os.getenv("SFTP_TARGET_DIR", ".")

    # Normalize for Windows paths
    local_folder = os.path.normpath(local_folder)

    if not os.path.isdir(local_folder):
        print(f"❌ ERROR: Folder not found: {local_folder}")
        return

    try:
        # Connect to SFTP
        print(f"🔗 Connecting to SFTP: {username}@{host}:{port}")
        transport = paramiko.Transport((host, port))
        transport.connect(username=username, password=password)
        sftp = paramiko.SFTPClient.from_transport(transport)

        # Ensure target directory exists on remote
        _mkdir_p(sftp, target_dir)

        # Upload each file in folder
        for filename in os.listdir(local_folder):
            local_file_path = os.path.join(local_folder, filename)

            if os.path.isfile(local_file_path):
                abs_file_path = os.path.abspath(local_file_path)
                remote_path = posixpath.join(target_dir, filename)  # Always POSIX for remote paths

                print(f"⬆️ Uploading {abs_file_path} → {remote_path}")
                try:
                    sftp.put(abs_file_path, remote_path)
                    print(f"✅ Uploaded: {filename}")
                except Exception as e:
                    print(f"❌ Failed to upload {filename}: {e}")

        # Close connection
        sftp.close()
        transport.close()
        print("🎯 All uploads complete.")

    except Exception as e:
        print(f"❌ SFTP upload failed: {type(e).__name__}: {e}")


def _mkdir_p(sftp, remote_directory):
    """Recursively create remote directories on SFTP server."""
    dirs = remote_directory.strip("/").split("/")
    current_path = ""
    for d in dirs:
        current_path = posixpath.join(current_path, d)
        try:
            sftp.stat("/" + current_path)
        except FileNotFoundError:
            print(f"📂 Creating remote dir: /{current_path}")
            sftp.mkdir("/" + current_path)
            try:
                sftp.chmod("/" + current_path, 0o755)  # Ensure write permission
            except Exception as chmod_err:
                print(f"⚠️ Could not set permissions for /{current_path}: {chmod_err}")

csv_manager.py — orchestration

from api import query_api as qa
from pdf import csv_generator as cg
from sftp import sftp_uploader as sftp
import os

SQL_DIR = os.getenv("SQL_DIR")

# List of SQL files to process
csvs = {
    "test1": "query1.sql",
    "test2": "query2.sql"
}

def generate_all_csv():
    for filename, sql_file in csvs.items():
        sql_file_path = os.path.join(SQL_DIR, sql_file)

        # Step 1: Fetch and save JSON
        json_file_path = qa.fetch_data_from_sql_file(sql_file_path, filename)
        if not json_file_path:
            continue

        # Step 2: Convert JSON to CSV
        csv_file_path = cg.generate_csv_from_json(json_file_path, filename)
        if not csv_file_path:
            continue

        # Step 3: Upload to SFTP
        sftp.upload_to_sftp(r"D:\sftp\csv_output")

docker-compose.yml (for local SFTP server)

services:
  sftp:
    image: atmoz/sftp
    container_name: dummy_sftp
    ports:
      - "2222:22"
    volumes:
      - ./sftp_upload:/home/testuser/upload:rw
    command: testuser:testpass:::upload

Conclusion

This project demonstrates how to integrate FastAPI, file processing, and SFTP delivery into a clean, modular ETL workflow. You learned how to:

  • Serve a mock API that processes SQL queries from files.

  • Store API results as JSON.

  • Convert JSON to CSV for downstream processing.

  • Upload generated files to an SFTP server running in Docker Compose.


 
 
 

Recent Posts

See All

Comments

Rated 0 out of 5 stars.
No ratings yet

Add a rating*

© 2024 Mohammed Juyel Haque. All rights reserved.

bottom of page