Overview

This short guide explains how use python to generate an image using RunPod.


What it does

  • Creates a RunPod job using the /run endpoint with a text prompt.
  • Polls the /status/{id} endpoint until the job reaches COMPLETED.
  • Reads output.image_url (a data:image/png;base64,... string), decodes the base64 payload, and saves it as {job_id}.png (or into --outdir).

The script is organized with small functions:

  • create_job(prompt, key, base_url) — create a job and return the API response JSON.
  • poll_job(job_id, key, base_url, timeout) — poll until completion and return the final JSON payload.
  • save_image_from_output(output, job_id, out_dir) — decode and write the PNG to disk.

Prerequisites

  • requests package (install with pip install -r requirements.txt or pip install requests)
  • A RunPod API key

Set your key using an environment variable:

export RUNPOD_BASE_URL="https://api.runpod.ai/v2/..."
export RUNPOD_KEY="rpa_..."

Tip: You can also pass --key on the command line to override the env var.


Usage examples

Create a new job (script will create, poll, and save image):

python main.py --prompt "an astronaut riding a horse on Mars" --outdir ./images

Poll an existing job (skip creation):

python main.py --id 7acd81e5-57ee-4d2b-8d94-8f970176fe9a-e2 --outdir ./images

Control polling timeout (seconds):

python main.py --timeout 120

Adjust logging verbosity:

export LOG_LEVEL=DEBUG
python main.py --prompt "..."

How it all works

  • The script expects the RunPod status API response to include at least the following JSON shape:
{
  "id": "...",
  "status": "IN_QUEUE|RUNNING|COMPLETED|ERROR",
  "delayTime": 1168,
  "output": {
    "image_url": "..."
  }
}
  • On COMPLETED, output.image_url must contain a data:image/png;base64, prefixed base64 string — the script strips the prefix and decodes the rest.
  • On FAILED or ERROR, the script raises an error and logs details.
  • Polling honors a delayTime value if supplied (converted from ms to seconds) or falls back to a default interval.

Example code is :

import argparse
import base64
import logging
import os
import sys
import time
from typing import Optional, Dict, Any

import requests

# Configuration
DEFAULT_BASE_URL = os.environ.get("RUNPOD_BASE_URL", "https://api.runpod.ai/v2/XXXX") 
DEFAULT_TIMEOUT = 600  # seconds

# Try to read key from environment first; fallback to hard-coded value if present
KEY = os.environ.get("RUNPOD_KEY", "rpa_XXXX")

# Logging
logging.basicConfig(
    level=os.environ.get("LOG_LEVEL", "INFO"),
    format="%(asctime)s %(levelname)s %(name)s - %(message)s",
)
logger = logging.getLogger(__name__)


def create_job(prompt: str, key: Optional[str] = None, base_url: str = DEFAULT_BASE_URL) -> Dict[str, Any]:
    key = key or KEY
    headers = {"Content-Type": "application/json", "Authorization": f"Bearer {key}"}
    data = {"input": {"prompt": prompt}}
    logger.info("Creating job with prompt: %s", prompt)
    r = requests.post(f"{base_url}/run", headers=headers, json=data)
    r.raise_for_status()
    return r.json()


def poll_job(job_id: str, key: Optional[str] = None, base_url: str = DEFAULT_BASE_URL, timeout: int = DEFAULT_TIMEOUT) -> Dict[str, Any]:
    key = key or KEY
    headers = {"Content-Type": "application/json", "Authorization": f"Bearer {key}"}
    status_url = f"{base_url}/status/{job_id}"

    start = time.time()
    while True:
        r = requests.get(status_url, headers=headers)
        r.raise_for_status()
        status_json = r.json()
        status = status_json.get("status")
        logger.info("Job %s status: %s", job_id, status)

        if status == "COMPLETED":
            logger.info("Job %s completed", job_id)
            return status_json

        if status in ("FAILED", "ERROR"):
            logger.error("Job %s ended with status: %s", job_id, status)
            raise RuntimeError(f"Job ended with status: {status}")

        elapsed = time.time() - start
        if elapsed > timeout:
            logger.error("Polling timed out after %s seconds for job %s", timeout, job_id)
            raise TimeoutError("Polling timed out")

        delay = status_json.get("delayTime")
        if isinstance(delay, (int, float)) and delay > 0:
            wait = min(1, delay / 1000.0 + 0.5)
        else:
            wait = 3
        time.sleep(wait)


def save_image_from_output(output: Dict[str, Any], job_id: str, out_dir: Optional[str] = None) -> str:
    image_url = output.get("image_url")
    if not image_url:
        raise RuntimeError("No 'image_url' found in output")

    prefix = "data:image/png;base64,"
    if image_url.startswith(prefix):
        b64 = image_url[len(prefix):]
    else:
        b64 = image_url

    imgdata = base64.b64decode(b64)

    if out_dir:
        os.makedirs(out_dir, exist_ok=True)
        out_file = os.path.join(out_dir, f"{job_id}.png")
    else:
        out_file = f"{job_id}.png"

    with open(out_file, "wb") as f:
        f.write(imgdata)

    logger.info("Saved image to %s", out_file)
    return out_file


def main(argv=None):
    parser = argparse.ArgumentParser(description="Run a RunPod job and save returned image")
    parser.add_argument("--prompt", default="create an image of a futuristic cityscape at sunset", help="Prompt to send to the model")
    parser.add_argument("--key", help="RunPod API key (overrides env var)")
    parser.add_argument("--id", help="If provided, skip creating a job and poll this job id instead")
    parser.add_argument("--timeout", type=int, default=DEFAULT_TIMEOUT, help="Polling timeout in seconds")
    parser.add_argument("--outdir", help="Directory to save the resulting image")
    parser.add_argument("--base-url", default=DEFAULT_BASE_URL, help="Base URL for the RunPod API")
    args = parser.parse_args(argv)

    key = args.key or KEY
    if not key:
        logger.error("No API key provided (set RUNPOD_KEY env or use --key)")
        sys.exit(1)

    try:
        if args.id:
            job_id = args.id
            logger.info("Using provided job id: %s", job_id)
        else:
            resp = create_job(args.prompt, key=key, base_url=args.base_url)
            logger.debug("Create response: %s", resp)
            job_id = resp.get("id")
            if not job_id:
                logger.error("No job id returned from run endpoint")
                sys.exit(1)

        status_json = poll_job(job_id, key=key, base_url=args.base_url, timeout=args.timeout)
        output = status_json.get("output", {})
        save_image_from_output(output, job_id, out_dir=args.outdir)
    except Exception:
        logger.exception("An error occurred")
        raise


if __name__ == "__main__":
    main()