Build a simple ETL Pipeline: FastAPI → JSON → CSV → SFTP (with Docker Compose)
- Mohammed Juyel Haque

- Aug 10
- 4 min read
A step-by-step blog post showing how to build a small ETL pipeline where a FastAPI endpoint executes SQL (from .sql files), saves the result as JSON, converts that JSON into CSV and uploads the CSV to an SFTP server running locally via Docker Compose.
This is perfect for demos, small automation tasks or testing SFTP delivery in CI pipelines.
Goals
Provide a FastAPI-based mock API that runs SQL queries stored in files and returns JSON.
Save API results into a json_output/ folder.
Convert each JSON to CSV into csv_output/.
Upload CSVs to a local SFTP server (using atmoz/sftp in Docker Compose).
Provide runnable code, .env example, and instructions.

Directory Structure:
project-root/
├─ api/
│ ├─ __init__.py
│ └─ query_api.py
├─ logic/
│ ├─ __init__.py
│ └─ csv_manager.py
├─ csv/
│ ├─ __init__.py
│ └─ csv_generator.py
├─ fast_api/
│ ├─ __init__.py
│ └─ mockapi.py
├─ docker/
│ └─ docker-compose.yml
├─ sftp/
│ ├─ __init__.py
│ └─ sftp_uploader.py
├─ json_output/
├─ csv_output/
├─ sftp_uploader/
├─ sql/
│ ├─ query1.sql
│ └─ query2.sql
├─ .env
├─ requirements.txt
├─ app.py
Prerequisites
Python 3.9+ (3.11 recommended)
Docker & Docker Compose
pip to install Python dependencies
Install Python deps:
pip install -r requirements.txtrequirements.txt minimal:
fastapi
uvicorn
python-dotenv
paramiko
Environment file (.env)
TEST_API=http://127.0.0.1:8000/dummy-query
BEARER_TOKEN=dummy_token
SQL_DIR=./sql
JSON_OUTPUT_DIR=./json_output
CSV_OUTPUT_DIR=./csv_output
SFTP_HOST=127.0.0.1
SFTP_PORT=2222
SFTP_USER=testuser
SFTP_PASSWORD=testpass
SFTP_TARGET_DIR=/upload
app.py — main entry point
from logic import csv_manager as cs
def main():
cs.generate_all_csv()
if __name__ == "__main__":
main()
Run with:
python app.pymock_api.py — FastAPI endpoint to run a named SQL file
# mock_api.py
from fastapi import FastAPI, Request
from pydantic import BaseModel
from fastapi.responses import JSONResponse
import uvicorn
app = FastAPI()
class QueryRequest(BaseModel):
query: str
@app.post("/dummy-query")
async def handle_query(req: QueryRequest):
# Dummy data you might get from a database
fake_data = [
{"id": 1, "name": "Alice", "email": "alice@example.com"},
{"id": 2, "name": "Bob", "email": "bob@example.com"},
]
return JSONResponse(content={"results": fake_data})
if __name__ == "__main__":
uvicorn.run(app, host="0.0.0.0", port=8000)Run with:
python mockapi.pyquery_api.py — helper that runs SQL and writes JSON
import os
import requests
import json
from dotenv import load_dotenv
load_dotenv()
DBSR_API = os.getenv("DBSR_API")
BEARER_TOKEN = os.getenv("BEARER_TOKEN")
HEADER = {
"Authorization": f"Bearer {BEARER_TOKEN}",
"Content-Type": "application/json"
}
def read_sql_file(file_path):
with open(file_path, 'r') as file:
return file.read()
def fetch_data_from_sql_file(sql_file_path, json_filename):
query = read_sql_file(sql_file_path)
return fetch_data_and_save(query, json_filename)
def fetch_data_and_save(query, json_filename):
payload = { "query": query }
try:
response = requests.post(DBSR_API, headers=HEADER, json=payload)
response.raise_for_status()
data = response.json().get("results", [])
json_output_dir = os.getenv("JSON_OUTPUT_DIR")
os.makedirs(json_output_dir, exist_ok=True)
json_file_path = os.path.join(json_output_dir, f"{json_filename}.json")
with open(json_file_path, 'w', encoding='utf-8') as f:
json.dump(data, f, indent=4)
print(f"Saved JSON data to {json_file_path}")
return json_file_path
except requests.exceptions.RequestException as e:
print(f"Error fetching data: {e}")
return None
csv_generator.py — JSON → CSV
import csv
import json
import os
def generate_csv_from_json(json_file_path, csv_filename):
if not os.path.exists(json_file_path):
print(f"❌ JSON file not found: {json_file_path}")
return None
with open(json_file_path, 'r', encoding='utf-8') as f:
data = json.load(f)
# Must be a non-empty list of dicts
if not isinstance(data, list) or not all(isinstance(row, dict) for row in data):
print(f"❌ Invalid JSON format in {json_file_path}. Expected a list of dictionaries.")
return None
if not data:
print(f"⚠ No data to write for {csv_filename}")
return None
keys = list(data[0].keys()) # Use first row's keys
csv_output_dir = os.getenv("CSV_OUTPUT_DIR")
os.makedirs(csv_output_dir, exist_ok=True)
csv_file_path = os.path.join(csv_output_dir, f"{csv_filename}.csv")
with open(csv_file_path, 'w', newline='', encoding='utf-8') as csvfile:
writer = csv.DictWriter(csvfile, fieldnames=keys)
writer.writeheader()
writer.writerows(data)
print(f"✅ CSV file {csv_file_path} generated successfully.")
return csv_file_path
sftp_uploader.py — upload CSVs to SFTP
import os
import posixpath
import paramiko
from dotenv import load_dotenv
load_dotenv()
def upload_to_sftp(local_folder):
"""Upload all files from a local folder to the SFTP target directory."""
host = os.getenv("SFTP_HOST")
port = int(os.getenv("SFTP_PORT", 22))
username = os.getenv("SFTP_USER")
password = os.getenv("SFTP_PASSWORD")
target_dir = os.getenv("SFTP_TARGET_DIR", ".")
# Normalize for Windows paths
local_folder = os.path.normpath(local_folder)
if not os.path.isdir(local_folder):
print(f"❌ ERROR: Folder not found: {local_folder}")
return
try:
# Connect to SFTP
print(f"🔗 Connecting to SFTP: {username}@{host}:{port}")
transport = paramiko.Transport((host, port))
transport.connect(username=username, password=password)
sftp = paramiko.SFTPClient.from_transport(transport)
# Ensure target directory exists on remote
_mkdir_p(sftp, target_dir)
# Upload each file in folder
for filename in os.listdir(local_folder):
local_file_path = os.path.join(local_folder, filename)
if os.path.isfile(local_file_path):
abs_file_path = os.path.abspath(local_file_path)
remote_path = posixpath.join(target_dir, filename) # Always POSIX for remote paths
print(f"⬆️ Uploading {abs_file_path} → {remote_path}")
try:
sftp.put(abs_file_path, remote_path)
print(f"✅ Uploaded: {filename}")
except Exception as e:
print(f"❌ Failed to upload {filename}: {e}")
# Close connection
sftp.close()
transport.close()
print("🎯 All uploads complete.")
except Exception as e:
print(f"❌ SFTP upload failed: {type(e).__name__}: {e}")
def _mkdir_p(sftp, remote_directory):
"""Recursively create remote directories on SFTP server."""
dirs = remote_directory.strip("/").split("/")
current_path = ""
for d in dirs:
current_path = posixpath.join(current_path, d)
try:
sftp.stat("/" + current_path)
except FileNotFoundError:
print(f"📂 Creating remote dir: /{current_path}")
sftp.mkdir("/" + current_path)
try:
sftp.chmod("/" + current_path, 0o755) # Ensure write permission
except Exception as chmod_err:
print(f"⚠️ Could not set permissions for /{current_path}: {chmod_err}")csv_manager.py — orchestration
from api import query_api as qa
from pdf import csv_generator as cg
from sftp import sftp_uploader as sftp
import os
SQL_DIR = os.getenv("SQL_DIR")
# List of SQL files to process
csvs = {
"test1": "query1.sql",
"test2": "query2.sql"
}
def generate_all_csv():
for filename, sql_file in csvs.items():
sql_file_path = os.path.join(SQL_DIR, sql_file)
# Step 1: Fetch and save JSON
json_file_path = qa.fetch_data_from_sql_file(sql_file_path, filename)
if not json_file_path:
continue
# Step 2: Convert JSON to CSV
csv_file_path = cg.generate_csv_from_json(json_file_path, filename)
if not csv_file_path:
continue
# Step 3: Upload to SFTP
sftp.upload_to_sftp(r"D:\sftp\csv_output")docker-compose.yml (for local SFTP server)
services:
sftp:
image: atmoz/sftp
container_name: dummy_sftp
ports:
- "2222:22"
volumes:
- ./sftp_upload:/home/testuser/upload:rw
command: testuser:testpass:::upload
Conclusion
This project demonstrates how to integrate FastAPI, file processing, and SFTP delivery into a clean, modular ETL workflow. You learned how to:
Serve a mock API that processes SQL queries from files.
Store API results as JSON.
Convert JSON to CSV for downstream processing.
Upload generated files to an SFTP server running in Docker Compose.


Comments