Stripe Interview Preparation Guide
Published on June 18, 2026
My preparation notes for the Stripe interview process. Using Python throughout and Zed as the editor.
Disclaimer: The views and opinions expressed in this post are entirely my own and based on publicly available information. The problem statements included here were sourced from various online communities and discussion forums — they are not confirmed to be actual Stripe interview questions. This is a personal study guide, not an official representation of Stripe’s hiring process.
The process
Stripe’s interview loop typically has five stages:
- Online Assessment — timed coding problems on an external platform. Usually algorithmic, with a focus on correctness and edge cases.
- Problem Solving — one or two technical interviews where you solve coding problems live with an engineer.
- Bug Bash — you’re given a broken codebase and asked to find and fix bugs. Tests the ability to read unfamiliar code quickly.
- Integration Round — design or extend a system, often involving APIs or working with existing abstractions. Less about algorithms, more about how you think about interfaces.
- Hiring Manager Round — behavioural and motivation questions. Why Stripe, how you’ve worked in the past, how you handle ambiguity.
Setup
Language: Python 3
Editor: Zed (setup guide)
Disable auto-completion before every technical round — it won’t be available in most assessment environments and you don’t want to depend on it:
// Zed settings (Ctrl+,)
{
"show_completions_on_input": false
}
Online Assessment
Problem statements and solutions to be added.
Problem Solving
Problem statements and solutions to be added.
Bug Bash
This round tests the scientific method in an unfamiliar codebase — not your ability to memorize APIs.
What you’re given: a clone of a slightly modified internal project, a description of a reported failure, and a test suite where some tests are failing. Your job is to find and fix the bug.
What the bugs look like
They’re rarely algorithmic. Expect logic or configuration errors:
- Validation too strict — regex rejecting valid emails containing
+, or a parser rejecting edge-case input it should accept - Platform assumptions — code that writes to a path without checking the directory exists first; works on the developer’s Mac, fails elsewhere
- Logic gaps — a condition that misses a specific case, e.g. an
ifthat handlesstatus == "active"andstatus == "inactive"but silently drops"pending" - Wrong library usage — passing arguments in the wrong order, using the wrong method variant, off-by-one in a range
Debugging methodology
Randomly changing lines until tests pass will fail you. Follow a disciplined five-step process:
1. Reproduction — confirm the failure first
Run the failing tests immediately. Do not look at the code yet. Do not touch the code yet. You need a stable, confirmed failure before you can tell whether any change you make actually fixed anything. If you change code before reproducing, you lose your baseline.
pytest tests/ -v
# note exactly which tests fail and what the error message says
2. Isolation — read the stack trace, not the whole codebase
Don’t start reading files top to bottom. Go straight to the stack trace. Find the exact line where the exception is raised or where the assertion fails. That’s your entry point into the code. Everything above it in the trace is call context — read upward only as needed.
3. Hypothesis generation — say it out loud
Form a specific, falsifiable statement before touching anything:
“I suspect the user ID is
Nonehere because the lookup is returning early”
“I think this regex doesn’t account for
+in the local part of the email”
Vague suspicions (“something seems off here”) lead to random edits. A named hypothesis leads to a targeted test.
4. Verification — add a print, confirm, then fix
Add a print or breakpoint() just before the suspect line to confirm your hypothesis:
print(f"DEBUG user_id={user_id!r}") # is it None?
print(f"DEBUG pattern match: {re.match(pattern, email)!r}")
breakpoint() # drop into pdb at runtime
Run only the failing test — not the whole suite — to iterate fast. Once confirmed, make the minimal fix.
5. Cleanup — leave it cleaner than you found it
Remove every print and breakpoint(). Run the full test suite. All previously passing tests must still pass. Commit with a clear message describing what was wrong and why.
How to prepare
The best practice is working with real bugs in real codebases:
-
Go to the requests or httpx GitHub repo. Filter issues by the
buglabel. Pick a closed one. Read the issue description, find the code that was broken, then read the PR that fixed it. Notice how small the fix usually is relative to the diagnosis work. -
Clone pandas or any library you know. Find a function, introduce a subtle break — change
<to<=, swap two arguments, remove a.strip(). Then write a test that catches it and use the methodology above to find it. This builds the muscle memory of going from a failing test to a root cause without flailing.
API Fundamentals
Stripe is a REST API company. The integration round and system design questions will expect you to be fluent in these basics.
HTTP Methods
| Method | Meaning | Example |
|---|---|---|
GET |
Read a resource, no side effects | GET /customers/cus_123 |
POST |
Create a new resource | POST /charges |
PUT |
Replace a resource entirely | PUT /customers/cus_123 (send all fields) |
PATCH |
Partially update a resource | PATCH /customers/cus_123 (send only changed fields) |
DELETE |
Remove a resource | DELETE /customers/cus_123 |
The distinction between PUT and PATCH matters: if you’re only updating a user’s status, use PATCH — PUT implies you’re replacing the whole object and anything you omit gets cleared.
HTTP Status Codes
2xx — Success
| Code | Meaning |
|---|---|
200 OK |
Standard success response |
201 Created |
Resource was created (successful POST) |
202 Accepted |
Request accepted but processing continues in the background (async jobs) |
4xx — Client errors (you messed up)
| Code | Meaning |
|---|---|
400 Bad Request |
Validation error, missing required field |
401 Unauthorized |
Not authenticated — no API key or invalid key |
403 Forbidden |
Authenticated but insufficient permissions |
404 Not Found |
Resource doesn’t exist |
429 Too Many Requests |
Rate limited — critical for Stripe integrations |
For 429, you must know how to handle it: exponential backoff. Retry after 1s, then 2s, 4s, 8s — with jitter to avoid thundering herd. Don’t hammer the API on a tight loop.
5xx — Server errors (Stripe messed up)
| Code | Meaning |
|---|---|
500 Internal Server Error |
Unexpected error on the server |
502 Bad Gateway |
Upstream service returned an invalid response |
503 Service Unavailable |
Server down for maintenance or overloaded |
5xx errors are safe to retry (unlike 4xx which will keep failing). Same exponential backoff applies.
Pagination: Cursor vs Offset
Offset pagination (?page=2&limit=10) is what most people reach for first. It’s inefficient and unstable:
- Inefficient: the database has to count and skip rows on every request — expensive at scale.
- Unstable: if items are added or deleted while you’re paging, you’ll see duplicates or miss records.
Cursor pagination (?starting_after=obj_id) is what Stripe uses. The cursor points to a specific record in the database, and the query fetches the next N records after it. No counting, no skipping.
# Stripe API example
page = stripe.Customer.list(limit=10)
while True:
process(page.data)
if not page.has_more:
break
page = stripe.Customer.list(limit=10, starting_after=page.data[-1].id)
The response includes has_more: true if there are more records beyond the current page, false otherwise — clean and unambiguous. Cursor pagination is both efficient (index seek, no full scan) and stable (the cursor anchors to a fixed position in the dataset regardless of concurrent writes).
JSON Serialization and Deserialization
REST APIs speak JSON. You need to be comfortable converting between Python objects and JSON without reaching for documentation.
Serialization — Python → JSON
import json
customer = {
"id": "cus_123",
"name": "Alice",
"email": "alice@example.com",
"balance": 0,
"metadata": {"plan": "pro"}
}
# Python dict → JSON string
json_string = json.dumps(customer)
print(json_string)
# '{"id": "cus_123", "name": "Alice", "email": "alice@example.com", ...}'
# Pretty-print (useful for debugging)
print(json.dumps(customer, indent=2))
# Python dict → JSON file
with open("customer.json", "w") as f:
json.dump(customer, f, indent=2)
Deserialization — JSON → Python
import json
# JSON string → Python dict
json_string = '{"id": "cus_123", "name": "Alice", "balance": 0}'
customer = json.loads(json_string)
print(customer["name"]) # Alice
print(type(customer)) # <class 'dict'>
# JSON file → Python dict
with open("customer.json", "r") as f:
customer = json.load(f)
Serializing custom objects
json.dumps only handles built-in types (dict, list, str, int, float, bool, None). For custom classes you need a custom encoder or convert to dict first:
import json
from dataclasses import dataclass, asdict
@dataclass
class Charge:
id: str
amount: int
currency: str
charge = Charge(id="ch_123", amount=2000, currency="usd")
# Option 1: convert to dict manually
json.dumps({"id": charge.id, "amount": charge.amount, "currency": charge.currency})
# Option 2: use asdict for dataclasses
json.dumps(asdict(charge))
# Option 3: custom encoder
class ChargeEncoder(json.JSONEncoder):
def default(self, obj):
if isinstance(obj, Charge):
return asdict(obj)
return super().default(obj)
json.dumps(charge, cls=ChargeEncoder)
Key distinction to remember:
| String | File | |
|---|---|---|
| Serialize | json.dumps(obj) |
json.dump(obj, f) |
| Deserialize | json.loads(s) |
json.load(f) |
dumps/loads — the s stands for string. dump/load — no s, takes a file handle.
Reading and Parsing Files Line by Line
Say you have a file with 100 lines, each containing a transaction: an integer ID, a float amount, and a float fee separated by commas:
1001,49.99,1.50
1002,120.00,3.60
1003,9.99,0.30
...
Reading and parsing:
transactions = []
with open("transactions.txt", "r") as f:
for line in f:
line = line.strip() # remove trailing newline and whitespace
if not line: # skip empty lines
continue
parts = line.split(",")
txn_id = int(parts[0])
amount = float(parts[1])
fee = float(parts[2])
transactions.append((txn_id, amount, fee))
# or more concisely with unpacking
with open("transactions.txt", "r") as f:
for line in f:
line = line.strip()
if not line:
continue
txn_id, amount, fee = line.split(",")
transactions.append((int(txn_id), float(amount), float(fee)))
Processing — total revenue, total fees:
total_amount = sum(t[1] for t in transactions)
total_fees = sum(t[2] for t in transactions)
net = total_amount - total_fees
print(f"Transactions: {len(transactions)}")
print(f"Total amount: {total_amount:.2f}")
print(f"Total fees: {total_fees:.2f}")
print(f"Net revenue: {net:.2f}")
Things to always do:
strip()every line — trailing\nwill breakint()/float()silently turning into aValueError- Guard against empty lines — files often have a trailing newline that produces a blank last line
- Use
with open(...)— it closes the file automatically even if an exception is raised
Does for line in f load the whole file into memory?
No — iterating over a file object in Python is lazy. It reads one line at a time using the OS’s buffered I/O (typically 8KB chunks internally), but exposes it to you one line at a time. The whole file is never in memory at once. This is safe and efficient even for multi-GB files.
The things that do load the whole file into memory: f.read() (one giant string) and f.readlines() (list of all lines). Avoid these for large files — use the for line in f loop instead.
Writing to a file:
transactions = [
(1001, 49.99, 1.50),
(1002, 120.00, 3.60),
(1003, 9.99, 0.30),
]
# Write (overwrites if file exists)
with open("output.txt", "w") as f:
for txn_id, amount, fee in transactions:
f.write(f"{txn_id},{amount},{fee}\n")
# Append (adds to end of file without overwriting)
with open("output.txt", "a") as f:
f.write(f"{1004},{75.00},{2.25}\n")
File modes:
| Mode | Behaviour |
|---|---|
"r" |
Read (default) — error if file doesn’t exist |
"w" |
Write — creates file if missing, truncates if it exists |
"a" |
Append — creates file if missing, preserves existing content |
"r+" |
Read and write — file must exist |
Integration Round
This round is less about algorithms and more about how you think through interfaces, data structures, and system design at the code level. You’ll be asked to build something real — not pseudocode.
Language note: Prefer Python over Java here. Less boilerplate means more time thinking about the actual problem. That said, Python’s dynamic typing is a double-edged sword — a typo in a variable name won’t be caught at compile time and will silently blow up at runtime. Be deliberate: name variables clearly, re-read your code before running, and test incrementally.
Question bank
Webhook delivery system Design an architecture for delivering webhook notifications to customers when a payment event occurs. The system must send HTTP requests with event payloads to all predefined merchant URLs. Requirements: payment events must never be lost, third-party endpoint failures must be retried, and merchants need a dashboard to view the history of all webhook attempts.
Core architecture:
- When a payment occurs, persist the event to a durable queue (e.g. a
webhook_eventsDB table or message queue) before attempting delivery — this is what guarantees no event is lost. - A worker pool picks up pending events and POSTs to each registered endpoint. On failure (non-2xx or timeout), write the attempt to a
webhook_attemptslog and re-enqueue with exponential backoff. - Each attempt (timestamp, status code, response body, latency) is stored so the dashboard can show the full delivery history per event.
Payment Service → [webhook_events table] → Delivery Workers → Merchant URL
↓
[webhook_attempts log] → Dashboard API
Retry policy: back off exponentially (1s, 2s, 4s, 8s…) up to a max (e.g. 72 hours), then mark the event as permanently failed and alert the merchant.
Follow-up 1: How would you add rate limiting? Use a sliding window per merchant — cap outgoing webhook requests to N per minute. Before dispatching a delivery, check the counter. If the merchant is at the limit, delay the attempt (reschedule it rather than dropping it). See the rate limiter question below.
Follow-up 2: Some merchants receive huge volumes while others receive very few. How do you ensure small merchants aren’t starved? A single global queue lets a high-volume merchant monopolise the workers. Instead, use per-merchant queues with round-robin dispatch across merchants. Each worker iteration picks one event from merchant A, one from merchant B, one from merchant C — regardless of how full each queue is. High-volume merchants get proportionally more throughput as queue depth grows, but they can never block small merchants entirely.
Real-time payment system with idempotency How would you design a real-time payment system that prevents duplicate charges? Key concept: idempotency keys. A client sends a unique key with each request; the server stores the result against that key and returns the cached result on retries rather than processing the charge again. Design the data model, the key storage, and the expiry policy.
import hashlib
import time
from dataclasses import dataclass, field
from enum import Enum
class PaymentStatus(Enum):
PENDING = "pending"
SUCCEEDED = "succeeded"
FAILED = "failed"
@dataclass
class PaymentResult:
status: PaymentStatus
charge_id: str
amount: int
currency: str
created_at: float = field(default_factory=time.time)
class PaymentProcessor:
def __init__(self, ttl_seconds: int = 86400): # keys expire after 24h
self._idempotency_store: dict[str, PaymentResult] = {}
self._ttl = ttl_seconds
def charge(self, idempotency_key: str, amount: int, currency: str) -> PaymentResult:
# return cached result if this key was seen before
if idempotency_key in self._idempotency_store:
cached = self._idempotency_store[idempotency_key]
if time.time() - cached.created_at < self._ttl:
return cached
# expired — remove and reprocess
del self._idempotency_store[idempotency_key]
result = self._process_charge(amount, currency)
self._idempotency_store[idempotency_key] = result
return result
def _process_charge(self, amount: int, currency: str) -> PaymentResult:
# in reality: call card network, write to DB, etc.
charge_id = hashlib.sha256(f"{amount}{currency}{time.time()}".encode()).hexdigest()[:16]
return PaymentResult(
status=PaymentStatus.SUCCEEDED,
charge_id=charge_id,
amount=amount,
currency=currency,
)
Key points:
- Check the store before processing — return the cached
PaymentResultimmediately on a duplicate key. The charge logic never runs twice. - The idempotency key is supplied by the client (e.g. a UUID they generate before the first attempt). The server never generates it.
- TTL prevents the store from growing unboundedly. 24 hours is Stripe’s policy.
- In production, the store is a database table (not an in-process dict) so it survives restarts and works across multiple server instances. The key column has a
UNIQUEconstraint — a race between two simultaneous requests with the same key is resolved by the DB rejecting one insert.
Currency exchange algorithm Design an algorithm that converts an amount from one currency to another given a table of exchange rates. Extension: what if a direct rate doesn’t exist and you need to chain conversions (USD → EUR → GBP)?
Parse currency/shipping rate string Parse a string in this format:
USD:CAD:DHL:5, USD:GBP:FedEx:3, EUR:USD:UPS:8
Each entry represents: source_currency:target_currency:carrier:rate. Write a method that takes a source currency, target currency, and amount, and returns the converted amount and the carrier to use.
def parse_rates(rate_string):
rates = {}
for entry in rate_string.split(", "):
src, tgt, carrier, rate = entry.split(":")
rates[(src, tgt)] = (carrier, float(rate))
return rates
def convert(rates, src, tgt, amount):
if (src, tgt) not in rates:
raise ValueError(f"No rate for {src} -> {tgt}")
carrier, rate = rates[(src, tgt)]
return amount * rate, carrier
Invoice scheduler Design a system for scheduling invoices to be sent at a future time. Think about: the data model for a scheduled job, how you’d persist and poll pending invoices, handling clock skew, and what happens if the scheduler crashes mid-send.
CSV processing and validation Given a CSV file of transactions, parse and validate the data. Common validation rules: required fields present, numeric fields are valid numbers, dates are in the expected format, no duplicate IDs. Return a list of valid rows and a list of errors with line numbers.
Shipping cost calculator Given a list of orders each with an associated country, calculate the shipping cost for each order. The rate table maps country → cost. Handle missing countries, bulk discounts, and free shipping thresholds.
Server load balancer Build the data structures and logic to manage a pool of servers and distribute client connections across them. Operations to implement:
connect(client_id)— assign a client to a server using the balancing strategydisconnect(client_id)— release a client’s connectionshutdown(server_id)— graceful shutdown: migrate all existing connections from this server to others, then remove it from the pool
Balancing strategies to know:
- Round robin — cycle through servers in order; simple, assumes homogeneous servers
- Least connections — assign to the server with the fewest active connections; better for variable-length sessions
- Consistent hashing — hash the client ID to a point on a ring; each server owns a range; adding/removing a server only remaps a fraction of clients — important for stateful sessions
from collections import defaultdict
class LoadBalancer:
def __init__(self, servers):
self.servers = list(servers) # available server IDs
self.connections = defaultdict(set) # server_id -> set of client_ids
self.client_map = {} # client_id -> server_id
self._rr_index = 0
def connect(self, client_id):
if not self.servers:
raise RuntimeError("No servers available")
server = self.servers[self._rr_index % len(self.servers)]
self._rr_index += 1
self.connections[server].add(client_id)
self.client_map[client_id] = server
return server
def disconnect(self, client_id):
server = self.client_map.pop(client_id, None)
if server:
self.connections[server].discard(client_id)
def shutdown(self, server_id):
# remove from rotation first so connect() won't pick it
self.servers.remove(server_id)
# migrate every existing client to a new server
for client_id in list(self.connections[server_id]):
self.disconnect(client_id)
self.connect(client_id)
del self.connections[server_id]
def least_connections_connect(self, client_id):
server = min(self.servers, key=lambda s: len(self.connections[s]))
self.connections[server].add(client_id)
self.client_map[client_id] = server
return server
Key points for the interview:
- Remove the server from
self.serversbefore migrating connections so thatconnect()during migration doesn’t route back to the shutting-down server. list(self.connections[server_id])— copy the set before iterating becausedisconnectmutates it.- Least-connections just needs
min(..., key=lambda s: len(self.connections[s]))— one line once the data structure is in place.
Invoice reminder email subjects Your invoicing system needs a configurable reminder schedule. Given an invoice and a schedule of day offsets relative to the due date, output the subject line of every email that would be sent, in sorted (chronological) order.
Example schedule: send an email 10 days before the due date, on the due date, 20 days after, and 30 days after.
from datetime import date, timedelta
SCHEDULE = [
(-10, "Reminder: Invoice #{id} is due in 10 days"),
(0, "Invoice #{id} is due today"),
(20, "Invoice #{id} is 20 days overdue — please pay"),
(30, "Final notice: Invoice #{id} is 30 days overdue"),
]
def email_subjects(invoice_id: str, due_date: date, schedule=SCHEDULE) -> list[str]:
emails = []
for offset, template in schedule:
send_date = due_date + timedelta(days=offset)
subject = template.format(id=invoice_id)
emails.append((send_date, subject))
emails.sort(key=lambda x: x[0]) # sort chronologically
return [subject for _, subject in emails]
# Example
subjects = email_subjects("INV-001", date(2024, 3, 15))
for s in subjects:
print(s)
# Reminder: Invoice #INV-001 is due in 10 days
# Invoice #INV-001 is due today
# Invoice #INV-001 is 20 days overdue — please pay
# Final notice: Invoice #INV-001 is 30 days overdue
The schedule is just a list of (offset, template) tuples — fully configurable, no special-casing. Sorting on send_date (a date object) is naturally chronological. The subject template uses .format(id=...) so the invoice ID is injected without hardcoding.
Word Search (LeetCode) Given a 2D board of characters and a word, return true if the word exists in the grid following adjacent cells (horizontal/vertical, no reuse). Classic DFS with backtracking — mark a cell visited before recursing, unmark on the way back out.
def exist(board: list[list[str]], word: str) -> bool:
rows, cols = len(board), len(board[0])
def dfs(r, c, i):
if i == len(word):
return True
if r < 0 or r >= rows or c < 0 or c >= cols or board[r][c] != word[i]:
return False
tmp, board[r][c] = board[r][c], "#" # mark visited in-place
found = (dfs(r+1, c, i+1) or
dfs(r-1, c, i+1) or
dfs(r, c+1, i+1) or
dfs(r, c-1, i+1))
board[r][c] = tmp # restore on the way back out
return found
return any(dfs(r, c, 0) for r in range(rows) for c in range(cols))
Key points:
- Temporarily overwrite the cell with
"#"to mark it visited — no separatevisitedset needed. - Restore the cell after the recursive call returns (backtracking). Without this, the mutation from one DFS path would corrupt subsequent paths.
- The base case
i == len(word)fires when every character has been matched — returnTrueimmediately. - Short-circuit with
or— as soon as one direction succeeds, stop exploring.
Rate limiter (100 requests per minute per user) Design a rate limiter that allows each unique user at most 100 requests per minute. The classic approach is a sliding window — track the timestamps of recent requests and evict ones outside the window.
import time
from collections import deque
class RateLimiter:
def __init__(self, max_requests: int = 100, window_seconds: int = 60):
self.max_requests = max_requests
self.window = window_seconds
self.user_windows: dict[str, deque] = {}
def is_allowed(self, user_id: str) -> bool:
now = time.time()
if user_id not in self.user_windows:
self.user_windows[user_id] = deque()
window = self.user_windows[user_id]
# evict timestamps that have fallen outside the window
while window and window[0] <= now - self.window:
window.popleft()
if len(window) < self.max_requests:
window.append(now)
return True
return False # rate limited — caller should return 429
Key points:
dequewithpopleft()is O(1) for eviction vs a list which is O(n).- Evict before checking the count — otherwise stale timestamps inflate the count.
- At scale, replace the in-process dict with Redis (use a sorted set keyed by
user_id, scored by timestamp). This makes it work across multiple server instances. - Trade-off: sliding window is accurate but uses O(requests) memory per user. Fixed window (just a counter + reset time) uses O(1) but allows bursts at window boundaries.
Document signing system (Docusign-like) Design the architecture for a system where a sender uploads a document, specifies signatories, and each signatory receives a link to sign. The document is considered complete when all parties have signed.
Core components:
- Document store — store the original PDF in object storage (S3). Never mutate the original; each signature creates a new version.
- Signing workflow state machine — track state per document:
draft → sent → partially_signed → completed | expired | voided. Each signatory has their ownpending → signed | declinedstate. - Audit trail — every event (sent, viewed, signed, declined) is appended to an immutable log with timestamp and IP.
- Notification service — email each signatory when it’s their turn. Email the sender when all parties have signed.
- Signed document generation — once all signatures are collected, render the final PDF with signature blocks embedded and store it separately.
Follow-up questions to be ready for: how do you prevent someone from signing twice? (idempotency token in the signing URL) How do you handle a signatory declining? (notify sender, allow resend or void) What if the document expires? (cron job scans for past-deadline sent documents and transitions them to expired).
import uuid
import time
from dataclasses import dataclass, field
from enum import Enum
class DocStatus(Enum):
DRAFT = "draft"
SENT = "sent"
PARTIALLY_SIGNED = "partially_signed"
COMPLETED = "completed"
EXPIRED = "expired"
VOIDED = "voided"
class SignatoryStatus(Enum):
PENDING = "pending"
SIGNED = "signed"
DECLINED = "declined"
@dataclass
class Signatory:
email: str
token: str = field(default_factory=lambda: uuid.uuid4().hex) # unique signing URL token
status: SignatoryStatus = SignatoryStatus.PENDING
signed_at: float | None = None
@dataclass
class Document:
doc_id: str
owner_email: str
signatories: list[Signatory]
status: DocStatus = DocStatus.DRAFT
expires_at: float = field(default_factory=lambda: time.time() + 7 * 86400) # 7 days
audit_log: list[dict] = field(default_factory=list)
def _log(self, event: str, actor: str, **kwargs):
self.audit_log.append({"event": event, "actor": actor, "ts": time.time(), **kwargs})
def send(self):
self.status = DocStatus.SENT
self._log("sent", self.owner_email)
for s in self.signatories:
self._notify(s.email, f"Please sign: /sign/{s.token}")
def sign(self, token: str) -> str:
if time.time() > self.expires_at:
return "expired"
signatory = next((s for s in self.signatories if s.token == token), None)
if not signatory:
return "invalid_token"
if signatory.status == SignatoryStatus.SIGNED:
return "already_signed" # idempotent — no double processing
signatory.status = SignatoryStatus.SIGNED
signatory.signed_at = time.time()
self._log("signed", signatory.email)
if all(s.status == SignatoryStatus.SIGNED for s in self.signatories):
self.status = DocStatus.COMPLETED
self._log("completed", "system")
self._notify(self.owner_email, "All parties have signed.")
else:
self.status = DocStatus.PARTIALLY_SIGNED
return "ok"
def decline(self, token: str) -> str:
signatory = next((s for s in self.signatories if s.token == token), None)
if not signatory:
return "invalid_token"
signatory.status = SignatoryStatus.DECLINED
self._log("declined", signatory.email)
self._notify(self.owner_email, f"{signatory.email} declined to sign.")
return "ok"
def void(self):
self.status = DocStatus.VOIDED
self._log("voided", self.owner_email)
def _notify(self, recipient: str, message: str):
print(f"[EMAIL] To: {recipient} | {message}") # replace with real email service
def expire_documents(documents: list[Document]):
now = time.time()
for doc in documents:
if doc.status == DocStatus.SENT and now > doc.expires_at:
doc.status = DocStatus.EXPIRED
doc._log("expired", "system")
Key points:
- Each signatory gets a unique
token(UUID) embedded in their signing URL — this is the idempotency mechanism. The server looks up the token, not a session or login. sign()checksalready_signedbefore doing any work — calling it twice with the same token is safe.- The
audit_logis append-only; nothing is ever removed from it. expire_documentsis meant to run as a cron job — scan allsentdocuments and flip any that are past their deadline.
Anti-fraud system — CSV aggregation
Given a CSV file with columns transaction_type, currency, date, amount, sum all amounts grouped by (date, transaction_type).
import csv
from collections import defaultdict
def aggregate(filename: str) -> dict[tuple, float]:
totals: dict[tuple, float] = defaultdict(float)
with open(filename) as f:
reader = csv.DictReader(f)
for row in reader:
key = (row["date"], row["transaction_type"])
totals[key] += float(row["amount"])
return dict(totals)
# Example output:
# {("2024-03-01", "charge"): 15000.0, ("2024-03-01", "refund"): 450.0, ...}
For fraud detection, flag groups that exceed a threshold:
def flag_suspicious(totals: dict[tuple, float], threshold: float = 10000.0):
return {k: v for k, v in totals.items() if v > threshold}
Key points: csv.DictReader uses the header row as keys — no manual column indexing. defaultdict(float) starts every key at 0.0 so += works without an existence check.
Brace Expansion I (LeetCode 1087)
Given a string like "{a,b,c}d{e,f}", return all words it expands to, sorted. Each {...} block is a choice — pick exactly one character from it.
def expand(s: str) -> list[str]:
groups = []
i = 0
while i < len(s):
if s[i] == '{':
j = s.index('}', i)
groups.append(sorted(s[i+1:j].split(',')))
i = j + 1
else:
groups.append([s[i]])
i += 1
result = ['']
for group in groups:
result = [prefix + c for prefix in result for c in group]
return sorted(result)
# expand("{a,b}c{d,e}") -> ["acd", "ace", "bcd", "bce"]
Build the result incrementally: start with [''] and for each group, take the Cartesian product of current prefixes × current group choices.
Brace Expansion II (LeetCode 1096)
Harder variant — braces can be nested and concatenation happens implicitly. "{a{b,c}}" → ["ab","ac"], "{a,b}{c,{d,e}}" → ["ac","ad","ae","bc","bd","be"].
Approach: recursive parser. A comma at depth 0 separates alternatives; everything else is concatenation. Each level returns a sorted deduplicated list of strings.
def braceExpansionII(expression: str) -> list[str]:
def parse(expr: str) -> list[str]:
groups = [['']] # list of "alternatives"; each alternative is a list of prefixes
depth = 0
start = 0
for i, c in enumerate(expr):
if c == '{':
if depth == 0:
start = i + 1
depth += 1
elif c == '}':
depth -= 1
if depth == 0:
sub = parse(expr[start:i])
groups[-1] = [a + b for a in groups[-1] for b in sub]
elif c == ',' and depth == 0:
groups.append([''])
elif depth == 0:
groups[-1] = [a + c for a in groups[-1]]
return sorted(set(w for g in groups for w in g))
return parse(expression)
Key points:
- Track
depthto know when you’re inside a nested{}— only act on,and plain characters at depth 0. - On closing
}at depth 0: recursively parse the inner content and take the Cartesian product with the current group’s prefixes. - Deduplicate with
setat each level since{a,a}should produce["a"]not["a","a"].
Sales data aggregation Given a list of sales records with varying input constraints (date range, region, product), calculate aggregate metrics. Focus on: grouping efficiently with dicts/defaultdict, handling missing keys gracefully, and clarifying the expected output format before writing any code.
Resources
- Stripe engineering blog
- LeetCode — filter by company tag: Stripe
- RoundZ — Stripe interview experiences
- 1point3acres — interview experiences and discussions, though the subscription is too costly