Python demo script
# /// script
# requires-python = ">=3.10"
# dependencies = ["cryptography"]
# ///
"""
Due Managed Wallets API Demo
=============================
Self-contained script that walks through the complete managed wallets flow
against the Due sandbox API:
1. Create first credential (primary signing key)
2. Create a wallet (vault)
3. Link the wallet to a Due account
4. Add a second credential (backup key)
5. Approve the backup credential using the primary key
6. Create a second wallet using the backup key
Prerequisites
-------------
- Python 3.10+
- uv (https://docs.astral.sh/uv/) — recommended, handles dependencies automatically
- A Due sandbox API token (Bearer)
- A Due account ID
Running
-------
uv run managed_wallet_demo.py
The script will prompt for your API token and account ID.
No manual `pip install` is needed — uv reads the inline metadata above
and installs `cryptography` automatically.
To reuse a primary key from a previous run:
PRIMARY_KEY=primary_key.pem uv run managed_wallet_demo.py
Alternatively, if you prefer pip:
pip install cryptography
python managed_wallet_demo.py
Important
---------
- Targets the sandbox environment (https://api.sandbox.due.network).
- On first run the primary private key is saved to `primary_key.pem`.
Pass PRIMARY_KEY=primary_key.pem on subsequent runs to reuse it.
"""
import base64
import json
import os
import sys
import urllib.request
import urllib.error
from cryptography.hazmat.primitives.asymmetric import ec
from cryptography.hazmat.primitives import hashes, serialization
BASE_URL = "https://api.sandbox.due.network"
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
def generate_keypair() -> tuple[ec.EllipticCurvePrivateKey, str]:
"""Generate a random EC P-256 keypair. Returns (private_key, public_pem_str)."""
private_key = ec.generate_private_key(ec.SECP256R1())
public_pem = private_key.public_key().public_bytes(
serialization.Encoding.PEM,
serialization.PublicFormat.SubjectPublicKeyInfo,
).decode()
return private_key, public_pem
def export_private_pem(private_key: ec.EllipticCurvePrivateKey) -> str:
"""Export private key as PEM string."""
return private_key.private_bytes(
serialization.Encoding.PEM,
serialization.PrivateFormat.PKCS8,
serialization.NoEncryption(),
).decode()
def load_private_key(pem_path: str) -> tuple[ec.EllipticCurvePrivateKey, str]:
"""Load a private key from PEM file. Returns (private_key, public_pem_str)."""
with open(pem_path, "rb") as f:
private_key = serialization.load_pem_private_key(f.read(), password=None)
public_pem = private_key.public_key().public_bytes(
serialization.Encoding.PEM,
serialization.PublicFormat.SubjectPublicKeyInfo,
).decode()
return private_key, public_pem
def sign_hex(private_key: ec.EllipticCurvePrivateKey, data: bytes) -> str:
"""SHA-256 sign data and return hex-encoded DER signature."""
sig = private_key.sign(data, ec.ECDSA(hashes.SHA256()))
return sig.hex()
def sign_base64url(private_key: ec.EllipticCurvePrivateKey, data: bytes) -> str:
"""SHA-256 sign data and return base64url-encoded DER signature (no padding)."""
sig = private_key.sign(data, ec.ECDSA(hashes.SHA256()))
return base64.urlsafe_b64encode(sig).rstrip(b"=").decode()
def api_call(
method: str,
path: str,
body: dict | None = None,
token: str = "",
account_id: str = "",
expect_403: bool = False,
) -> dict:
"""Make an API call. Returns parsed JSON response.
If expect_403 is True, a 403 is treated as a normal response (challenge).
"""
url = BASE_URL + path
headers = {"Content-Type": "application/json"}
if token:
headers["Authorization"] = f"Bearer {token}"
if account_id:
headers["Due-Account-Id"] = account_id
data = json.dumps(body or {}).encode()
req = urllib.request.Request(url, data=data, headers=headers, method=method)
try:
with urllib.request.urlopen(req) as resp:
return json.loads(resp.read())
except urllib.error.HTTPError as e:
resp_body = e.read().decode()
if expect_403 and e.code == 403:
return json.loads(resp_body)
print(f" HTTP {e.code}: {resp_body}")
raise
def challenge_response(
path: str,
payload: dict,
private_key: ec.EllipticCurvePrivateKey,
cred_id: str,
token: str,
account_id: str = "",
) -> dict:
"""Execute the challenge-response signing flow (Pattern 2).
1. POST with payload → receive 403 with challenge
2. Sign clientData with private_key
3. Retry POST with signature block
"""
# Step 1: initial request → 403 challenge
resp = api_call("POST", path, body=payload, token=token,
account_id=account_id, expect_403=True)
if resp.get("code") != "ACTION_SIGNATURE_REQUIRED":
# If it succeeded directly (no challenge), just return
return resp
challenge_id = resp["data"]["challengeIdentifier"]
client_data_b64 = resp["data"]["factors"]["Key"]["clientData"]
# Step 2: sign (clientData may be base64url without padding)
padded = client_data_b64 + "=" * (-len(client_data_b64) % 4)
client_data_raw = base64.urlsafe_b64decode(padded)
# sign() with ECDSA(SHA256) already hashes internally — pass raw data
sig = sign_base64url(private_key, client_data_raw)
# Step 3: retry with signature
signed_body = {
**payload,
"signature": {
"challengeIdentifier": challenge_id,
"firstFactor": {
"kind": "Key",
"credentialAssertion": {
"credId": cred_id,
"clientData": client_data_b64,
"signature": sig,
},
},
},
}
return api_call("POST", path, body=signed_body, token=token,
account_id=account_id)
# ---------------------------------------------------------------------------
# Flow steps
# ---------------------------------------------------------------------------
def step_create_credential(
token: str, name: str, private_key: ec.EllipticCurvePrivateKey, public_pem: str,
) -> str:
"""Create a credential using Pattern 1 (direct signing). Returns credentialId."""
print(f"\n--- Creating credential: {name} ---")
# Init
init = api_call("POST", "/v1/vaults/credentials/init",
body={"kind": "Key", "name": name}, token=token)
challenge = init["challenge"]
client_data_hash = init["clientDataHash"]
print(f" Challenge received: {challenge[:40]}...")
# Build JSON to sign
challenge_json = json.dumps(
{"clientDataHash": client_data_hash, "publicKey": public_pem},
separators=(",", ":"),
)
sig = sign_hex(private_key, challenge_json.encode())
# Submit
cred = api_call("POST", "/v1/vaults/credentials", token=token, body={
"kind": "Key",
"signature": sig,
"publicKey": public_pem,
"challenge": challenge,
})
cred_id = cred["id"]
approved = cred.get("approveUntil") is None
print(f" Credential created: {cred_id}")
print(f" Approved: {approved}")
return cred_id
def step_create_wallet(
token: str,
private_key: ec.EllipticCurvePrivateKey,
cred_id: str,
) -> tuple[str, str]:
"""Create a wallet (vault) using Pattern 2. Returns (wallet_id, address)."""
print("\n--- Creating wallet (vault) ---")
resp = challenge_response("/v1/vaults", {}, private_key, cred_id, token)
wallet_id = resp["id"]
address = resp["address"]
print(f" Wallet ID: {wallet_id}")
print(f" Address: {address}")
return wallet_id, address
def step_link_wallet(token: str, address: str, account_id: str) -> dict:
"""Link wallet to an account."""
print("\n--- Linking wallet to account ---")
resp = api_call("POST", "/v1/wallets",
body={"address": address}, token=token,
account_id=account_id)
print(f" Linked wallet: {resp.get('id', resp)}")
return resp
def step_approve_credential(
token: str,
target_cred_id: str,
signer_key: ec.EllipticCurvePrivateKey,
signer_cred_id: str,
) -> dict:
"""Approve a credential using Pattern 2 (signed by an already-approved cred)."""
print(f"\n--- Approving credential: {target_cred_id} ---")
resp = challenge_response(
"/v1/vaults/credentials/approve",
{"payload": {"credentialId": target_cred_id}},
signer_key, signer_cred_id, token,
)
approved = resp.get("approveUntil") is None
print(f" Approved: {approved}")
return resp
def find_credential_by_pubkey(token: str, public_pem: str) -> str | None:
"""Find an existing credential ID by matching public key. Returns credentialId or None."""
url = BASE_URL + "/v1/vaults/credentials"
req = urllib.request.Request(url, headers={"Authorization": f"Bearer {token}"})
with urllib.request.urlopen(req) as resp:
creds = json.loads(resp.read())
pub_normalized = public_pem.strip()
for c in creds:
if c.get("publicKey", "").strip() == pub_normalized:
return c["id"]
return None
def step_list_credentials(token: str) -> list[dict]:
"""List all credentials."""
print("\n--- Listing credentials ---")
url = BASE_URL + "/v1/vaults/credentials"
req = urllib.request.Request(url, headers={
"Authorization": f"Bearer {token}",
})
with urllib.request.urlopen(req) as resp:
creds = json.loads(resp.read())
for c in creds:
print(f" {c['id']} name={c.get('name', '?'):20s} "
f"active={c.get('isActive')} approveUntil={c.get('approveUntil')}")
return creds
# ---------------------------------------------------------------------------
# Main
# ---------------------------------------------------------------------------
def main():
print("=" * 60)
print("Due Managed Wallets — API Demo")
print("=" * 60)
token = input("\nAPI token (Bearer): ").strip()
account_id = input("Account ID: ").strip()
if not token or not account_id:
print("Both token and account ID are required.")
return
# Step 1 — Load or generate primary keypair & resolve credential
key_file = os.environ.get("PRIMARY_KEY") # path to existing PEM
if key_file and os.path.exists(key_file):
print(f"\n[1] Loading primary key from {key_file}...")
pk1, pub1 = load_private_key(key_file)
# Find existing credential by matching public key
cred1 = find_credential_by_pubkey(token, pub1)
if cred1:
print(f" Found existing credential: {cred1}")
else:
print(" No existing credential found for this key, creating...")
cred1 = step_create_credential(token, "Primary key", pk1, pub1)
else:
print("\n[1] Generating primary keypair...")
pk1, pub1 = generate_keypair()
out_path = key_file or "primary_key.pem"
with open(out_path, "w") as f:
f.write(export_private_pem(pk1))
print(f" Private key saved to: {out_path}")
cred1 = step_create_credential(token, "Primary key", pk1, pub1)
# Step 2 — Create wallet
print("\n[2] Creating wallet...")
wallet_id, address = step_create_wallet(token, pk1, cred1)
# Step 3 — Link wallet to account
print("\n[3] Linking wallet to account...")
step_link_wallet(token, address, account_id)
# Step 4 — Generate second keypair & create backup credential
print("\n[4] Generating backup keypair...")
pk2, pub2 = generate_keypair()
cred2 = step_create_credential(token, "Backup key", pk2, pub2)
# Step 5 — Approve backup credential (signed by primary)
print("\n[5] Approving backup credential...")
step_approve_credential(token, cred2, pk1, cred1)
# Step 6 — Create second wallet using backup key
print("\n[6] Creating second wallet (using backup key)...")
wallet_id2, address2 = step_create_wallet(token, pk2, cred2)
# Final — List credentials
print("\n[7] Final credential state:")
step_list_credentials(token)
print("\n" + "=" * 60)
print("Demo complete!")
print("=" * 60)
if __name__ == "__main__":
main()Updated 14 days ago