How I Built a Redis Server from Scratch in Python
Like many backend developers, I use Redis every day. It’s fast, reliable, and “just works.” But I realized I treated it like a black box. I knew how to use it, but not how it worked.
So I decided to build it from scratch. In Python.
The result is PyRedis: a fully functional Redis server that speaks the real RESP protocol, handles concurrent connections with asyncio, supports transactions, streams, and even master-replica replication. In this post I’ll walk you through the architecture and the most interesting problems I had to solve.
Why Build a Database from Scratch?
You might ask: “Why build a slower version of Redis in Python when the original C version exists?”
The goal was never to replace Redis. It was to understand it.
Building a database forces you to confront problems that don’t appear in typical web development:
- Raw socket handling: Parsing binary byte streams instead of HTTP requests.
- Concurrency without threads: Handling thousands of connections on a single thread.
- Distributed consistency: Keeping a replica in sync with a master in real time.
Reading documentation tells you what a system does. Building it tells you why it was designed that way.
The Protocol: RESP
The first thing I had to implement was RESP (Redis Serialization Protocol). Redis doesn’t speak JSON or HTTP. It uses its own binary-safe, text-based protocol. A simple GET mykey command looks like this on the wire:
*2\r\n$3\r\nGET\r\n$5\r\nmykey\r\n
That’s a RESP Array (*2) with two Bulk Strings ($3 and $5). Every type has a prefix character: + for simple strings, - for errors, : for integers, $ for bulk strings, and * for arrays.
I implemented this as two stateless classes: a RESPParser that converts raw bytes into Python objects, and a RESPEncoder that does the reverse. It inspects the Python type and automatically chooses the correct RESP format:
# Usage:
RESPEncoder.encode("hello") # → b'$5\r\nhello\r\n' (Bulk String)
RESPEncoder.encode(42) # → b':42\r\n' (Integer)
RESPEncoder.encode({"ok": "PONG"}) # → b'+PONG\r\n' (Simple String)
RESPEncoder.encode({"error": "ERR"}) # → b'-ERR\r\n' (Error)
RESPEncoder.encode(["GET", "mykey"]) # → b'*2\r\n$3\r\nGET\r\n$5\r\nmykey\r\n'
The key design decision was making both parser and encoder stateless: each call is independent. This made testing trivial and eliminated an entire category of bugs.
Concurrency with asyncio
Redis is famously single-threaded. Instead of threads, it uses an event loop to multiplex thousands of connections on a single core. Python’s asyncio is a natural fit for this model.
The server itself is just a few lines:
# app/server.py
async def start_server(host="localhost", port=6379, handler=None):
if handler is None:
handler = handle_client
server = await asyncio.start_server(handler, host, port)
async with server:
await server.serve_forever()
Each incoming connection gets its own coroutine. Inside, the handler reads bytes from the socket, parses them into a command, executes it, encodes the response, and writes it back:
# app/handler.py
async def handle_client(reader: asyncio.StreamReader, writer: asyncio.StreamWriter):
addr = writer.get_extra_info("peername")
try:
while True:
data = await reader.read(1024)
if not data:
break
command = RESPParser.parse(data)
response = await execute_command(command, connection_id=addr,
reader=reader, writer=writer)
response_bytes = RESPEncoder.encode(response)
writer.write(response_bytes)
await writer.drain()
finally:
remove_transaction_context(connection_id=addr)
ReplicaManager.remove_replica(addr)
writer.close()
await writer.wait_closed()
The await reader.read() call is where the magic happens. While one client is waiting for data, the event loop is free to serve other clients. No threads, no locks, no race conditions.
The Command Pattern
Each Redis command is its own class, inheriting from an abstract base:
# app/commands/base.py
class BaseCommand(ABC):
@property
@abstractmethod
def name(self) -> str:
"""Return the command name (e.g., 'PING', 'GET')."""
@property
def is_write_command(self) -> bool:
"""Whether this command should be propagated to replicas."""
return False
@abstractmethod
async def execute(self, args: list[str], connection_id=None) -> Any:
"""Execute the command."""
This makes adding a new command trivial. Here’s the complete SET implementation:
# app/commands/set.py
class SetCommand(BaseCommand):
@property
def name(self) -> str:
return "SET"
@property
def is_write_command(self) -> bool:
return True
async def execute(self, args: list[str], connection_id=None):
self.validate_args(args, min_args=2)
key, value = args[0], args[1]
if len(args) > 2:
if len(args) == 4 and args[2].upper() == "PX":
px_value = int(args[3])
storage = get_storage()
storage.set_with_ttl(key, value, px_value)
return {"ok": "OK"}
else:
raise ValueError("ERR syntax error")
storage = get_storage()
storage.set(key, value)
return {"ok": "OK"}
The is_write_command property is critical: it tells the handler whether or not this command needs to be forwarded to replicas. Adding a new command is as simple as creating a file, implementing the interface, and registering it in the CommandRegistry.
Transactions: MULTI/EXEC
Redis transactions queue commands and execute them atomically. Each connection gets a TransactionContext that tracks whether a transaction is active:
# app/transaction.py
class TransactionContext:
def __init__(self):
self._in_transaction = False
self._queued_commands: List[tuple[str, List[str]]] = []
def start_transaction(self):
self._in_transaction = True
self._queued_commands = []
def queue_command(self, command_name: str, args: List[str]):
self._queued_commands.append((command_name, args))
When a client is inside a MULTI block, the handler intercepts every command and queues it instead of executing immediately:
# app/handler.py (inside execute_command)
if transaction_ctx and transaction_ctx.in_transaction \
and not command_obj.bypasses_transaction_queue:
transaction_ctx.queue_command(command_name, command_args)
return {"queued": "QUEUED"}
On EXEC, all queued commands run sequentially and their results are returned as an array. On DISCARD, the queue is dropped. The bypasses_transaction_queue flag ensures that MULTI, EXEC, and DISCARD themselves never get queued.
The Hardest Part: Replication
Implementing SET and GET is straightforward. Implementing replication (making one server mirror another one) is where things get interesting.
Here’s how it works in PyRedis:
1. The Handshake. The replica connects to the master and performs a four-step handshake:
# app/replication.py
async def start_handshake(self):
await self.connect()
await self.send_ping()
listening_port = ServerConfig.get_listening_port()
await self.send_replconf_listening_port(listening_port)
await self.send_replconf_capa()
await self.send_psync()
First PING to verify the connection, then REPLCONF to announce capabilities, then PSYNC to request synchronization. The master responds with FULLRESYNC and sends an RDB snapshot.
2. Command Propagation. After the handshake, the master forwards every write command to all replicas through the ReplicaManager:
# app/replica_manager.py
@classmethod
async def propagate_command(cls, command_name: str, args: list[str]):
if not cls._replicas:
return
command_array = [command_name.upper(), *args]
encoded = RESPEncoder.encode(command_array)
for connection_id, (reader, writer) in cls._replicas.items():
try:
writer.write(encoded)
await writer.drain()
except Exception as e:
logger.error(f"Error propagating to {connection_id}: {e}")
cls._master_offset += len(encoded)
3. Acknowledgments. The WAIT command lets a client block until a certain number of replicas have confirmed they received all commands up to a given offset. This is implemented using asyncio.Condition:
# app/replica_manager.py
@classmethod
async def wait_for_replication(cls, numreplicas: int, timeout_ms: int) -> int:
target_offset = cls._master_offset
# Send GETACK to all replicas
getack_command = RESPEncoder.encode(["REPLCONF", "GETACK", "*"])
for connection_id, (reader, writer) in cls._replicas.items():
writer.write(getack_command)
await writer.drain()
# Wait for ACKs with timeout
condition = cls._get_condition()
try:
return await asyncio.wait_for(
_wait_condition(),
timeout=timeout_ms / 1000.0
)
except asyncio.TimeoutError:
return cls._count_acks(target_offset)
Getting the replication protocol right was, by far, the most time-consuming part of the project. The interplay between offset tracking, asynchronous ACKs, and timeout management required careful coordination.
Streams and Blocking Reads
The most complex data structure I implemented was Redis Streams. XADD appends timestamped entries with auto-generated IDs. XREAD BLOCK lets a client wait for new data to arrive on a stream, which requires a cooperative blocking mechanism:
# app/commands/xread.py (simplified)
async def execute(self, args, connection_id=None):
block_timeout, streams = self._parse_args(args)
# Try immediate read
result = self._query_streams(streams)
if result is not None:
return result
# If not blocking, return None
if block_timeout is None:
return None
# Register waiters and block
for key in keys:
event = asyncio.Event()
register_waiter(key, event)
# Wait for notification or timeout
done, pending = await asyncio.wait(
[asyncio.create_task(event.wait()) for _, event in events],
timeout=block_timeout,
return_when=asyncio.FIRST_COMPLETED
)
# Re-query after wake-up
return self._query_streams(streams)
When another client runs XADD, the storage layer notifies all registered waiters, waking up the blocked XREAD coroutines.
Storage Layer
The storage layer is a Python dict with type awareness. Each key stores a typed wrapper (RedisString, RedisList, or RedisStream), and a decorator enforces type safety:
# app/storage/memory.py
class InMemoryStorage(BaseStorage):
def __init__(self):
self._data: dict[str, RedisValue] = {}
self._expiry: dict[str, float] = {} # key → monotonic expiration time
def _is_expired(self, key: str) -> bool:
if key not in self._expiry:
return False
return time.monotonic() > self._expiry[key]
@require_type(RedisType.STRING)
def get(self, key: str) -> Optional[str]:
if key not in self._data:
return None
if self._is_expired(key):
del self._data[key]
del self._expiry[key]
return None
return self._data[key].value
TTL uses time.monotonic() instead of wall-clock time, which prevents bugs caused by system clock changes. Expiration is lazy: keys are only cleaned up when accessed.
What I Learned
This project taught me more about systems programming than any tutorial or course I’ve taken.
- Protocol design matters. RESP looks simple, but its prefix-based type system is extremely efficient to parse. There’s a reason Redis didn’t use JSON.
asynciois powerful, not simple. The happy path is easy. Coordinating timeouts, cancellations, and blocking operations across coroutines requires real discipline.- Replication is a distributed systems problem. Even a simplified version forces you to think about ordering, consistency, and failure recovery.
- The Command Pattern pays off. Having each command as an isolated unit made the codebase easy to extend and test. I went from 5 to 25+ commands without ever touching the handler logic.
- Testing is non-negotiable. You can’t build a database without trusting your test suite.
pytest-asynciowas invaluable for testing async code.
Architecture at a Glance
Client → TCP → RESP Parser → Command Handler → Command Registry → Storage
↓ ↓
Transaction Mgr Replica Manager → Replicas
The full architecture guide with Mermaid diagrams is available in the repository docs.
Try It Out
The entire codebase is open source and designed to be readable. If you’re learning asyncio, system design, or just curious about how Redis works internally, I hope you find it useful.
git clone https://github.com/albertopastormr/pyredis.git
cd pyredis
uv sync
./run-redis.sh
Feedback, issues, and contributions are welcome.