axios_python logo
axios_python

axios_python

A developer-experience-first HTTP client for Python, heavily inspired by Axios. Built on httpx with interceptors, middleware, retries, and full async support.

axios_python

PyPI version Python versions License: MIT

A developer-experience-first HTTP client for Python, heavily inspired by Axios. Built natively on httpx, it adds a network orchestration layer that raw transport libraries don't provide — lifecycle hooks, middleware pipelines, interceptors, automatic retries, and a unified sync/async API.


Why axios_python?

Python's HTTP ecosystem (requests, httpx, aiohttp) excels at transport. But modern applications need more:

Capabilityrequestshttpxaxios_python
Sync requests
Async requests
Request interceptors
Middleware pipeline
Built-in retry engine
Plugin system
Request cancellation
Isolated client instances
Swappable transport

Features

  • 🌐 Instance-based clients — Completely isolated state, headers, and config per API.
  • 🔄 Unified sync/async APIapi.get() and await api.async_get() share an identical interface.
  • 🔗 Interceptors — Hook into requests before they're sent or responses before they're returned.
  • 🚰 Middleware pipeline — Express.js-style async middleware for timing, tracing, and custom caching.
  • 🔁 Retry engine — Built-in linear, fixed, and exponential backoff strategies.
  • 🚫 Cancellation tokens — Cleanly abort in-flight requests at any point.
  • 🔌 Plugin system — Drop in Cache, Auth, and Logger plugins with a single line.
  • 🧩 Swappable transporthttpx by default; bring your own adapter via BaseTransport.
  • 📝 Fully typed — 100% strict type annotations for Python 3.10+.

Installation

pip install axios_python
uv add axios_python
poetry add axios_python

Requires Python 3.10 or higher. httpx is installed automatically as a dependency.


Quick Start

Make a one-off request

No setup required. Use the module-level helpers for quick scripts or exploration.

import axios_python

response = axios_python.get("https://httpbin.org/get", params={"q": "python"})

print(response.status_code)  # 200
print(response.ok)           # True
print(response.json())       # Parsed JSON body

Create a configured client instance

For real applications, create an isolated api instance with shared defaults. Each instance maintains its own headers, interceptors, middleware, and plugin state — completely independent from others.

import axios_python

api = axios_python.create({
    "base_url": "https://api.myapp.com/v1",
    "timeout": 15,
    "headers": {
        "X-App-Version": "2.0",
        "Accept": "application/json",
    },
})

response = api.get("/users", params={"page": 1})
print(response.json())

Use sync or async — your choice

The API is identical. Prefix any method with async_ to get a non-blocking coroutine.

import axios_python

api = axios_python.create({"base_url": "https://httpbin.org"})

response = api.get("/get", params={"query": "python"})
print(f"Status: {response.status_code}")
import asyncio
import axios_python

api = axios_python.create({"base_url": "https://httpbin.org"})

async def main():
    response = await api.async_get("/delay/2")
    print(response.data)

asyncio.run(main())

Handle file uploads and streams

Multipart file uploads follow the familiar requests interface — pass a file handle or a (filename, handle, mimetype) tuple.

import axios_python

with open("report.csv", "rb") as f:
    response = axios_python.post(
        "https://httpbin.org/post",
        files={"file": ("report.csv", f, "text/csv")},
    )

print(response.status_code)

Pass stream=True to avoid buffering large responses into memory. The response becomes a context manager exposing iter_bytes() and aiter_lines().

import axios_python

with axios_python.get("https://httpbin.org/stream-bytes/1024", stream=True) as resp:
    for chunk in resp.iter_bytes(chunk_size=64):
        process(chunk)
async def stream_data():
    async with await axios_python.async_get("https://.../stream", stream=True) as resp:
        async for line in resp.aiter_lines():
            print(line)

Core Concepts

Interceptors

Interceptors let you tap into the request/response lifecycle. They run sequentially in the order they are registered and are ideal for cross-cutting concerns like authentication, logging, and response normalization.

api = axios_python.create({"base_url": "https://api.myapp.com"})

# --- Request interceptor ---
def attach_auth(config):
    config["headers"]["Authorization"] = f"Bearer {get_current_token()}"
    return config

api.interceptors.request.use(attach_auth)

# --- Response interceptor ---
def unwrap_envelope(response):
    response.data = response.json().get("data", response.data)
    return response

api.interceptors.response.use(unwrap_envelope)

# --- Error interceptor ---
def handle_401(error):
    if error.response and error.response.status_code == 401:
        refresh_token()
    raise error

api.interceptors.response.use(unwrap_envelope, handle_401)

Interceptors are synchronous by design for simplicity. For async-capable wrapping logic (e.g., distributed tracing, async token refresh), use Middleware instead.


Middleware

Middleware wraps the entire request pipeline, giving you control both before and after the underlying transport call. This is the right tool for timing, distributed tracing, circuit breakers, and custom caching.

import time

async def timing_middleware(ctx, next_fn):
    start = time.monotonic()
    print(f"→ {ctx['method'].upper()} {ctx['url']}")

    result = await next_fn(ctx)

    elapsed = (time.monotonic() - start) * 1000
    print(f"← {result.status_code} in {elapsed:.1f}ms")
    return result


async def trace_middleware(ctx, next_fn):
    ctx["headers"]["X-Trace-Id"] = generate_trace_id()
    return await next_fn(ctx)


api.use(trace_middleware)
api.use(timing_middleware)

Middleware is executed in the order it is registered. The call to await next_fn(ctx) passes control to the next middleware — or to the transport layer if it is the last in the chain.


Retry Engine

Network hiccups shouldn't take down your app. Configure a retry strategy when creating your client, and axios_python handles back-off transparently.

from axios_python import ExponentialBackoff

api = axios_python.create({
    "base_url": "https://api.myapp.com",
    "max_retries": 4,
    "retry_strategy": ExponentialBackoff(
        base=0.5,
        multiplier=2.0,
        max_delay=10.0,
    ),
})
from axios_python import LinearBackoff

api = axios_python.create({
    "base_url": "https://api.myapp.com",
    "max_retries": 3,
    "retry_strategy": LinearBackoff(delay=1.0),  # 1s, 2s, 3s
})
from axios_python import FixedBackoff

api = axios_python.create({
    "base_url": "https://api.myapp.com",
    "max_retries": 5,
    "retry_strategy": FixedBackoff(delay=2.0),  # Always 2s
})

By default, retries trigger on network errors and timeouts. To also retry on specific HTTP status codes:

from axios_python import ExponentialBackoff

api = axios_python.create({
    "base_url": "https://api.myapp.com",
    "max_retries": 3,
    "retry_strategy": ExponentialBackoff(
        base=1.0,
        multiplier=2.0,
        retry_on_status=[429, 502, 503, 504],
    ),
})

Request Cancellation

Use a CancelToken to abort in-flight requests — useful for search-as-you-type, user navigation, or timeout-driven cancellation.

import threading
import time
import axios_python
from axios_python import CancelToken

api = axios_python.create({"base_url": "https://httpbin.org"})
token = CancelToken()

def fetch():
    try:
        response = api.get("/delay/10", cancel_token=token)
        print(response.status_code)
    except axios_python.CancelError as e:
        print(f"Aborted: {e}")

thread = threading.Thread(target=fetch)
thread.start()

time.sleep(1.5)
token.cancel(reason="User navigated away")
thread.join()

In async code, CancelToken integrates seamlessly with asyncio.Task cancellation. You can also share a single token across multiple concurrent requests to abort them all at once.


Concurrent Requests

Execute multiple requests in parallel and collect results together.

from concurrent.futures import ThreadPoolExecutor
import axios_python

api = axios_python.create({"base_url": "https://httpbin.org"})

endpoints = ["/get", "/ip", "/user-agent", "/headers"]

with ThreadPoolExecutor(max_workers=4) as pool:
    futures = [pool.submit(api.get, ep) for ep in endpoints]
    results = [f.result() for f in futures]

for response in results:
    print(response.status_code, response.url)
import asyncio
import axios_python

api = axios_python.create({"base_url": "https://httpbin.org"})

async def main():
    tasks = [
        api.async_get("/get"),
        api.async_get("/ip"),
        api.async_get("/user-agent"),
    ]
    results = await asyncio.gather(*tasks)
    for response in results:
        print(response.status_code, response.json())

asyncio.run(main())

Plugins

Plugins extend client instances with reusable, composable behavior. Register them once; they apply to every request the instance makes.

Auth Plugin

Injects Authorization headers automatically. Supports static tokens or a dynamic provider function for token rotation.

from axios_python import AuthPlugin

api.plugin(AuthPlugin(scheme="Bearer", token="super-secret-key"))

api.plugin(AuthPlugin(
    scheme="Bearer",
    token_provider=lambda: vault.get_secret("api-token"),
))

Cache Plugin

In-memory TTL cache for GET requests. Identical URLs return cached responses without a network round-trip.

from axios_python import CachePlugin

api.plugin(CachePlugin(
    ttl=120,
    max_size=256,
))

The Cache Plugin only caches GET requests. POST, PUT, PATCH, and DELETE requests always bypass the cache and additionally invalidate any cached entry for the same URL.


Logger Plugin

Structured logging-module output for every request and response, with configurable verbosity.

import logging
from axios_python import LoggerPlugin

logging.basicConfig(level=logging.DEBUG)

api.plugin(LoggerPlugin(
    level=logging.INFO,
    log_headers=False,
    log_body=True,
))

Writing Custom Plugins

Implement the Plugin protocol — two optional hooks, zero boilerplate.

from axios_python import Plugin

class CorrelationIdPlugin(Plugin):
    """Attaches a unique X-Correlation-Id header to every outgoing request."""

    def on_request(self, config: dict) -> dict:
        import uuid
        config["headers"]["X-Correlation-Id"] = str(uuid.uuid4())
        return config

    def on_response(self, response):
        cid = response.headers.get("X-Correlation-Id")
        response.correlation_id = cid
        return response

api.plugin(CorrelationIdPlugin())

Error Handling

axios_python uses a typed exception hierarchy rooted at AxiosPythonError. Every exception carries the original request config and, where applicable, the response object.

import axios_python

try:
    response = axios_python.get("https://httpbin.org/status/503")
    response.raise_for_status()

except axios_python.HTTPStatusError as e:
    print(f"HTTP {e.response.status_code}: {e.response.url}")

except axios_python.TimeoutError:
    print("Request exceeded the configured timeout.")

except axios_python.NetworkError:
    print("Could not reach the server. Check connectivity.")

except axios_python.RetryError as e:
    print(f"Exhausted all retry attempts. Last error: {e.__cause__}")

except axios_python.CancelError as e:
    print(f"Request was cancelled: {e}")

except axios_python.AxiosPythonError as e:
    print(f"Unexpected error: {e}")

Exception Hierarchy

AxiosPythonError
├── HTTPStatusError      # Non-2xx response (raise_for_status)
├── TimeoutError         # Connect or read timeout exceeded
├── NetworkError         # DNS failure, connection refused, etc.
├── RetryError           # All retry attempts exhausted
└── CancelError          # Request aborted via CancelToken

Configuration Reference

All options can be passed to axios_python.create(config) as instance-level defaults, or overridden per-request as keyword arguments.

Prop

Type


Advanced

Custom Transports

Replace httpx entirely by implementing BaseTransport. This is useful for testing (mock transports), internal RPC protocols, or alternate HTTP backends.

from axios_python import AxiosPython, BaseTransport, Response

class MockTransport(BaseTransport):
    """Returns a canned 200 response for every request — great for unit tests."""

    def send(self, request) -> Response:
        return Response(
            status_code=200,
            headers={"Content-Type": "application/json"},
            data={"mock": True, "url": str(request.url)},
            request=request,
        )

    async def send_async(self, request) -> Response:
        return self.send(request)


api = AxiosPython(
    config={"base_url": "https://api.myapp.com"},
    transport=MockTransport(),
)

response = api.get("/users")
print(response.json())  # {"mock": True, "url": "https://api.myapp.com/users"}

Explore the Docs


License

Released under the MIT License. Contributions welcome.

On this page