Python demo script

# /// script
# requires-python = ">=3.10"
# dependencies = ["cryptography"]
# ///
"""
Due Managed Wallets API Demo
=============================
Self-contained script that walks through the complete managed wallets flow
against the Due sandbox API:

  1. Create first credential (primary signing key)
  2. Create a wallet (vault)
  3. Link the wallet to a Due account
  4. Add a second credential (backup key)
  5. Approve the backup credential using the primary key
  6. Create a second wallet using the backup key

Prerequisites
-------------
- Python 3.10+
- uv (https://docs.astral.sh/uv/) — recommended, handles dependencies automatically
- A Due sandbox API token (Bearer)
- A Due account ID

Running
-------
    uv run managed_wallet_demo.py

  The script will prompt for your API token and account ID.
  No manual `pip install` is needed — uv reads the inline metadata above
  and installs `cryptography` automatically.

  To reuse a primary key from a previous run:

    PRIMARY_KEY=primary_key.pem uv run managed_wallet_demo.py

  Alternatively, if you prefer pip:

    pip install cryptography
    python managed_wallet_demo.py

Important
---------
- Targets the sandbox environment (https://api.sandbox.due.network).
- On first run the primary private key is saved to `primary_key.pem`.
  Pass PRIMARY_KEY=primary_key.pem on subsequent runs to reuse it.
"""

import base64
import json
import os
import sys
import urllib.request
import urllib.error
from cryptography.hazmat.primitives.asymmetric import ec
from cryptography.hazmat.primitives import hashes, serialization

BASE_URL = "https://api.sandbox.due.network"


# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------

def generate_keypair() -> tuple[ec.EllipticCurvePrivateKey, str]:
    """Generate a random EC P-256 keypair. Returns (private_key, public_pem_str)."""
    private_key = ec.generate_private_key(ec.SECP256R1())
    public_pem = private_key.public_key().public_bytes(
        serialization.Encoding.PEM,
        serialization.PublicFormat.SubjectPublicKeyInfo,
    ).decode()
    return private_key, public_pem


def export_private_pem(private_key: ec.EllipticCurvePrivateKey) -> str:
    """Export private key as PEM string."""
    return private_key.private_bytes(
        serialization.Encoding.PEM,
        serialization.PrivateFormat.PKCS8,
        serialization.NoEncryption(),
    ).decode()


def load_private_key(pem_path: str) -> tuple[ec.EllipticCurvePrivateKey, str]:
    """Load a private key from PEM file. Returns (private_key, public_pem_str)."""
    with open(pem_path, "rb") as f:
        private_key = serialization.load_pem_private_key(f.read(), password=None)
    public_pem = private_key.public_key().public_bytes(
        serialization.Encoding.PEM,
        serialization.PublicFormat.SubjectPublicKeyInfo,
    ).decode()
    return private_key, public_pem


def sign_hex(private_key: ec.EllipticCurvePrivateKey, data: bytes) -> str:
    """SHA-256 sign data and return hex-encoded DER signature."""
    sig = private_key.sign(data, ec.ECDSA(hashes.SHA256()))
    return sig.hex()


def sign_base64url(private_key: ec.EllipticCurvePrivateKey, data: bytes) -> str:
    """SHA-256 sign data and return base64url-encoded DER signature (no padding)."""
    sig = private_key.sign(data, ec.ECDSA(hashes.SHA256()))
    return base64.urlsafe_b64encode(sig).rstrip(b"=").decode()


def api_call(
    method: str,
    path: str,
    body: dict | None = None,
    token: str = "",
    account_id: str = "",
    expect_403: bool = False,
) -> dict:
    """Make an API call. Returns parsed JSON response.

    If expect_403 is True, a 403 is treated as a normal response (challenge).
    """
    url = BASE_URL + path
    headers = {"Content-Type": "application/json"}
    if token:
        headers["Authorization"] = f"Bearer {token}"
    if account_id:
        headers["Due-Account-Id"] = account_id

    data = json.dumps(body or {}).encode()
    req = urllib.request.Request(url, data=data, headers=headers, method=method)

    try:
        with urllib.request.urlopen(req) as resp:
            return json.loads(resp.read())
    except urllib.error.HTTPError as e:
        resp_body = e.read().decode()
        if expect_403 and e.code == 403:
            return json.loads(resp_body)
        print(f"  HTTP {e.code}: {resp_body}")
        raise


def challenge_response(
    path: str,
    payload: dict,
    private_key: ec.EllipticCurvePrivateKey,
    cred_id: str,
    token: str,
    account_id: str = "",
) -> dict:
    """Execute the challenge-response signing flow (Pattern 2).

    1. POST with payload → receive 403 with challenge
    2. Sign clientData with private_key
    3. Retry POST with signature block
    """
    # Step 1: initial request → 403 challenge
    resp = api_call("POST", path, body=payload, token=token,
                     account_id=account_id, expect_403=True)

    if resp.get("code") != "ACTION_SIGNATURE_REQUIRED":
        # If it succeeded directly (no challenge), just return
        return resp

    challenge_id = resp["data"]["challengeIdentifier"]
    client_data_b64 = resp["data"]["factors"]["Key"]["clientData"]

    # Step 2: sign (clientData may be base64url without padding)
    padded = client_data_b64 + "=" * (-len(client_data_b64) % 4)
    client_data_raw = base64.urlsafe_b64decode(padded)
    # sign() with ECDSA(SHA256) already hashes internally — pass raw data
    sig = sign_base64url(private_key, client_data_raw)

    # Step 3: retry with signature
    signed_body = {
        **payload,
        "signature": {
            "challengeIdentifier": challenge_id,
            "firstFactor": {
                "kind": "Key",
                "credentialAssertion": {
                    "credId": cred_id,
                    "clientData": client_data_b64,
                    "signature": sig,
                },
            },
        },
    }
    return api_call("POST", path, body=signed_body, token=token,
                     account_id=account_id)


# ---------------------------------------------------------------------------
# Flow steps
# ---------------------------------------------------------------------------

def step_create_credential(
    token: str, name: str, private_key: ec.EllipticCurvePrivateKey, public_pem: str,
) -> str:
    """Create a credential using Pattern 1 (direct signing). Returns credentialId."""
    print(f"\n--- Creating credential: {name} ---")

    # Init
    init = api_call("POST", "/v1/vaults/credentials/init",
                     body={"kind": "Key", "name": name}, token=token)
    challenge = init["challenge"]
    client_data_hash = init["clientDataHash"]
    print(f"  Challenge received: {challenge[:40]}...")

    # Build JSON to sign
    challenge_json = json.dumps(
        {"clientDataHash": client_data_hash, "publicKey": public_pem},
        separators=(",", ":"),
    )
    sig = sign_hex(private_key, challenge_json.encode())

    # Submit
    cred = api_call("POST", "/v1/vaults/credentials", token=token, body={
        "kind": "Key",
        "signature": sig,
        "publicKey": public_pem,
        "challenge": challenge,
    })
    cred_id = cred["id"]
    approved = cred.get("approveUntil") is None
    print(f"  Credential created: {cred_id}")
    print(f"  Approved: {approved}")
    return cred_id


def step_create_wallet(
    token: str,
    private_key: ec.EllipticCurvePrivateKey,
    cred_id: str,
) -> tuple[str, str]:
    """Create a wallet (vault) using Pattern 2. Returns (wallet_id, address)."""
    print("\n--- Creating wallet (vault) ---")
    resp = challenge_response("/v1/vaults", {}, private_key, cred_id, token)
    wallet_id = resp["id"]
    address = resp["address"]
    print(f"  Wallet ID: {wallet_id}")
    print(f"  Address:   {address}")
    return wallet_id, address


def step_link_wallet(token: str, address: str, account_id: str) -> dict:
    """Link wallet to an account."""
    print("\n--- Linking wallet to account ---")
    resp = api_call("POST", "/v1/wallets",
                     body={"address": address}, token=token,
                     account_id=account_id)
    print(f"  Linked wallet: {resp.get('id', resp)}")
    return resp


def step_approve_credential(
    token: str,
    target_cred_id: str,
    signer_key: ec.EllipticCurvePrivateKey,
    signer_cred_id: str,
) -> dict:
    """Approve a credential using Pattern 2 (signed by an already-approved cred)."""
    print(f"\n--- Approving credential: {target_cred_id} ---")
    resp = challenge_response(
        "/v1/vaults/credentials/approve",
        {"payload": {"credentialId": target_cred_id}},
        signer_key, signer_cred_id, token,
    )
    approved = resp.get("approveUntil") is None
    print(f"  Approved: {approved}")
    return resp


def find_credential_by_pubkey(token: str, public_pem: str) -> str | None:
    """Find an existing credential ID by matching public key. Returns credentialId or None."""
    url = BASE_URL + "/v1/vaults/credentials"
    req = urllib.request.Request(url, headers={"Authorization": f"Bearer {token}"})
    with urllib.request.urlopen(req) as resp:
        creds = json.loads(resp.read())
    pub_normalized = public_pem.strip()
    for c in creds:
        if c.get("publicKey", "").strip() == pub_normalized:
            return c["id"]
    return None


def step_list_credentials(token: str) -> list[dict]:
    """List all credentials."""
    print("\n--- Listing credentials ---")
    url = BASE_URL + "/v1/vaults/credentials"
    req = urllib.request.Request(url, headers={
        "Authorization": f"Bearer {token}",
    })
    with urllib.request.urlopen(req) as resp:
        creds = json.loads(resp.read())
    for c in creds:
        print(f"  {c['id']}  name={c.get('name', '?'):20s}  "
              f"active={c.get('isActive')}  approveUntil={c.get('approveUntil')}")
    return creds


# ---------------------------------------------------------------------------
# Main
# ---------------------------------------------------------------------------

def main():
    print("=" * 60)
    print("Due Managed Wallets — API Demo")
    print("=" * 60)

    token = input("\nAPI token (Bearer): ").strip()
    account_id = input("Account ID: ").strip()

    if not token or not account_id:
        print("Both token and account ID are required.")
        return

    # Step 1 — Load or generate primary keypair & resolve credential
    key_file = os.environ.get("PRIMARY_KEY")  # path to existing PEM
    if key_file and os.path.exists(key_file):
        print(f"\n[1] Loading primary key from {key_file}...")
        pk1, pub1 = load_private_key(key_file)
        # Find existing credential by matching public key
        cred1 = find_credential_by_pubkey(token, pub1)
        if cred1:
            print(f"  Found existing credential: {cred1}")
        else:
            print("  No existing credential found for this key, creating...")
            cred1 = step_create_credential(token, "Primary key", pk1, pub1)
    else:
        print("\n[1] Generating primary keypair...")
        pk1, pub1 = generate_keypair()
        out_path = key_file or "primary_key.pem"
        with open(out_path, "w") as f:
            f.write(export_private_pem(pk1))
        print(f"  Private key saved to: {out_path}")
        cred1 = step_create_credential(token, "Primary key", pk1, pub1)

    # Step 2 — Create wallet
    print("\n[2] Creating wallet...")
    wallet_id, address = step_create_wallet(token, pk1, cred1)

    # Step 3 — Link wallet to account
    print("\n[3] Linking wallet to account...")
    step_link_wallet(token, address, account_id)

    # Step 4 — Generate second keypair & create backup credential
    print("\n[4] Generating backup keypair...")
    pk2, pub2 = generate_keypair()
    cred2 = step_create_credential(token, "Backup key", pk2, pub2)

    # Step 5 — Approve backup credential (signed by primary)
    print("\n[5] Approving backup credential...")
    step_approve_credential(token, cred2, pk1, cred1)

    # Step 6 — Create second wallet using backup key
    print("\n[6] Creating second wallet (using backup key)...")
    wallet_id2, address2 = step_create_wallet(token, pk2, cred2)

    # Final — List credentials
    print("\n[7] Final credential state:")
    step_list_credentials(token)

    print("\n" + "=" * 60)
    print("Demo complete!")
    print("=" * 60)


if __name__ == "__main__":
    main()