ev-node-explainer
Explains ev-node architecture, components, and internal workings. Use when the user asks how ev-node works, wants to understand the block package, DA layer, sequencing, namespaces, or needs architecture explanations. Covers block production, syncing, DA submission, forced inclusion, single vs based sequencer, and censorship resistance.
Packaged view
This page reorganizes the original catalog entry around fit, installability, and workflow context first. The original raw source lives below.
Install command
npx @skill-hub/cli install evstack-ev-node-ev-node-explainer
Repository
Skill path: .claude/skills/ev-node-explainer
Explains ev-node architecture, components, and internal workings. Use when the user asks how ev-node works, wants to understand the block package, DA layer, sequencing, namespaces, or needs architecture explanations. Covers block production, syncing, DA submission, forced inclusion, single vs based sequencer, and censorship resistance.
Open repositoryBest for
Primary workflow: Ship Full Stack.
Technical facets: Full Stack.
Target audience: everyone.
License: Unknown.
Original source
Catalog source: SkillHub Club.
Repository owner: evstack.
This is still a mirrored public skill entry. Review the repository before installing into production workflows.
What it helps with
- Install ev-node-explainer into Claude Code, Codex CLI, Gemini CLI, or OpenCode workflows
- Review https://github.com/evstack/ev-node before adding ev-node-explainer to shared team environments
- Use ev-node-explainer for development workflows
Works across
Favorites: 0.
Sub-skills: 0.
Aggregator: No.
Original source / Raw SKILL.md
---
name: ev-node-explainer
description: Explains ev-node architecture, components, and internal workings. Use when the user asks how ev-node works, wants to understand the block package, DA layer, sequencing, namespaces, or needs architecture explanations. Covers block production, syncing, DA submission, forced inclusion, single vs based sequencer, and censorship resistance.
---
# ev-node Architecture Explainer
ev-node is a sovereign rollup framework that allows building rollups on any Data Availability (DA) layer. It follows a modular architecture where components can be swapped.
**Reference files:**
- [block-architecture.md](block-architecture.md) - Block package deep dive
- [da-sequencing.md](da-sequencing.md) - DA and sequencing deep dive
## Core Principles
1. **Zero-dependency core** - `core/` contains only interfaces, no external deps
2. **Modular components** - Executor, Sequencer, DA are pluggable
3. **Two operating modes** - Aggregator (produces blocks) and Sync-only (follows chain)
4. **Separation of concerns** - Block production, syncing, and DA submission are independent
## Package Overview
| Package | Responsibility |
|---------|---------------|
| `core/` | Interfaces only (Executor, Sequencer) |
| `types/` | Data structures (Header, Data, State, SignedHeader) |
| `block/` | Block lifecycle management |
| `execution/` | Execution layer implementations (EVM, ABCI) |
| `node/` | Node initialization and orchestration |
| `pkg/p2p/` | libp2p-based networking |
| `pkg/store/` | Persistent storage |
| `pkg/da/` | DA layer abstraction |
## Block Package Deep Dive
The block package is the most complex part of ev-node. See [block-architecture.md](block-architecture.md) for the complete breakdown.
### Component Summary
```
Components struct:
├── Executor - Block production (Aggregator only)
├── Reaper - Transaction scraping (Aggregator only)
├── Syncer - Block synchronization
├── Submitter - DA submission and inclusion
└── Cache - Unified state caching
```
### Entry Points
- `NewAggregatorComponents()` - Full node that produces and syncs blocks
- `NewSyncComponents()` - Non-aggregator that only syncs
### Key Data Types
**Header** - Block metadata (height, time, hashes, proposer)
**Data** - Transaction list with metadata
**SignedHeader** - Header with proposer signature
**State** - Chain state (last block, app hash, DA height)
## Block Production Flow (Aggregator)
```
Sequencer.GetNextBatch()
│
▼
Executor.ExecuteTxs()
│
├──► SignedHeader + Data
│
├──► P2P Broadcast
│
└──► Submitter Queue
│
▼
DA Layer
```
## Block Sync Flow (Non-Aggregator)
```
┌─────────────────────────────────────┐
│ Syncer │
├─────────────┬─────────────┬─────────┤
│ DA Worker │ P2P Worker │ Forced │
│ │ │ Incl. │
└──────┬──────┴──────┬──────┴────┬────┘
│ │ │
└─────────────┴───────────┘
│
▼
processHeightEvent()
│
▼
ExecuteTxs → Update State
```
## Data Availability Layer
The DA layer abstracts blob storage. ev-node uses Celestia but the interface is pluggable. See [da-sequencing.md](da-sequencing.md) for full details.
### Namespaces
DA uses 29-byte namespaces (1 byte version + 28 byte ID). Three namespaces are used:
| Namespace | Purpose |
|-----------|---------|
| Header | Block headers |
| Data | Transaction data (optional, can share with header) |
| Forced Inclusion | User-submitted txs for censorship resistance |
### DA Client Interface
```go
type Client interface {
Submit(ctx, data [][]byte, gasPrice, namespace, options) ResultSubmit
Retrieve(ctx, height uint64, namespace) ResultRetrieve
Get(ctx, ids []ID, namespace) ([]Blob, error)
}
```
### Key Files
| File | Purpose |
|------|---------|
| `pkg/da/types/types.go` | Core types (Blob, ID, Commitment) |
| `pkg/da/types/namespace.go` | Namespace handling |
| `block/internal/da/client.go` | DA client wrapper |
| `block/internal/da/forced_inclusion_retriever.go` | Forced tx retrieval |
---
## Sequencing
Sequencers order transactions for block production. See [da-sequencing.md](da-sequencing.md) for full details.
### Two Modes
| Mode | Mempool | Forced Inclusion | Use Case |
|------|---------|------------------|----------|
| **Single** | Yes | Yes | Traditional rollup |
| **Based** | No | Only source | High liveness guarantee |
### Sequencer Interface
```go
type Sequencer interface {
SubmitBatchTxs(ctx, req) (*SubmitBatchTxsResponse, error)
GetNextBatch(ctx, req) (*GetNextBatchResponse, error)
VerifyBatch(ctx, req) (*VerifyBatchResponse, error)
SetDAHeight(height uint64)
GetDAHeight() uint64
}
```
### ForceIncludedMask
Batches include a mask distinguishing tx sources:
```go
type Batch struct {
Transactions [][]byte
ForceIncludedMask []bool // true = from DA (must validate)
}
```
This allows the execution layer to skip validation for already-validated mempool txs.
### Key Files
| File | Purpose |
|------|---------|
| `core/sequencer/sequencing.go` | Core interface |
| `pkg/sequencers/single/sequencer.go` | Hybrid sequencer |
| `pkg/sequencers/based/sequencer.go` | Pure DA sequencer |
| `pkg/sequencers/common/checkpoint.go` | Shared checkpoint logic |
---
## Forced Inclusion
Forced inclusion prevents sequencer censorship:
1. User submits tx directly to DA layer
2. Syncer detects tx in forced-inclusion namespace
3. Grace period starts (adjusts based on block fullness)
4. If not included by sequencer within grace period → sequencer marked malicious
5. Tx gets included regardless
## Key Files
| File | Purpose |
|------|---------|
| `block/public.go` | Exported types and factories |
| `block/components.go` | Component creation |
| `block/internal/executing/executor.go` | Block production |
| `block/internal/syncing/syncer.go` | Sync orchestration |
| `block/internal/submitting/submitter.go` | DA submission |
| `block/internal/cache/manager.go` | Unified cache |
## Common Questions
### How does block production work?
The Executor runs `executionLoop()`:
1. Wait for block time or new transactions
2. Get batch from sequencer
3. Execute via execution layer
4. Create SignedHeader + Data
5. Broadcast to P2P
6. Queue for DA submission
### How does syncing work?
The Syncer coordinates three workers:
- **DA Worker** - Fetches confirmed blocks from DA
- **P2P Worker** - Receives gossiped blocks
- **Forced Inclusion** - Monitors for censored txs
All feed into `processHeightEvent()` which validates and executes.
### What happens if DA submission fails?
Submitter has retry logic with exponential backoff. Status codes:
- `TooBig` - Splits blob into chunks
- `AlreadyInMempool` - Skips (duplicate)
- `NotIncludedInBlock` - Retries with backoff
- `ContextCanceled` - Request canceled
### How is state recovered after crash?
The Replayer syncs execution layer from disk:
1. Load last committed height from store
2. Check execution layer height
3. Replay any missing blocks
4. Ensure consistency before starting
## Architecture Diagrams
For detailed component diagrams and state machines, see [block-architecture.md](block-architecture.md).
---
## Referenced Files
> The following files are referenced in this skill and included for context.
### block-architecture.md
```markdown
# Block Package Architecture
Complete technical reference for the ev-node block package.
## Directory Structure
```
block/
├── public.go # Exported types, DA client factory
├── components.go # Component creation and lifecycle
└── internal/
├── common/
│ ├── errors.go # Error definitions
│ ├── event.go # DAHeightEvent, event types
│ ├── metrics.go # Prometheus metrics
│ ├── options.go # BlockOptions configuration
│ ├── expected_interfaces.go
│ └── replay.go # Replayer for crash recovery
├── executing/
│ └── executor.go # Block production loop
├── syncing/
│ ├── syncer.go # Main sync orchestration
│ ├── da_retriever.go # DA block fetching
│ └── p2p_handler.go # P2P block coordination
├── submitting/
│ ├── submitter.go # Main submission loop
│ └── da_submitter.go # DA submission with retries
├── reaping/
│ └── reaper.go # Transaction scraping
├── cache/
│ ├── manager.go # Unified cache interface
│ ├── generic_cache.go # Generic cache impl
│ ├── pending_headers.go # Header tracking
│ └── pending_data.go # Data tracking
└── da/
├── client.go # DA client wrapper
├── interface.go # DA interfaces
├── async_block_retriever.go
└── forced_inclusion_retriever.go
```
## Component Lifecycle
All components implement:
```go
type Component interface {
Start(ctx context.Context) error
Stop() error
}
```
Startup order:
1. Cache Manager (loads persisted state)
2. Syncer (begins sync workers)
3. Executor (begins production loop) - Aggregator only
4. Reaper (begins tx scraping) - Aggregator only
5. Submitter (begins DA submission)
## Executor (`internal/executing/executor.go`)
Block production for aggregator nodes.
### State
```go
type Executor struct {
lastState *atomic.Pointer[types.State]
sequencer Sequencer
exec Executor
broadcaster Broadcaster
submitter Submitter
cache Cache
blockTime time.Duration
lazyMode bool
maxPending uint64
}
```
### Main Loop
```go
func (e *Executor) executionLoop(ctx context.Context) {
timer := time.NewTimer(e.blockTime)
for {
select {
case <-ctx.Done():
return
case <-timer.C:
e.produceBlock(ctx)
timer.Reset(e.blockTime)
case <-e.txNotifyCh:
// New txs arrived, produce immediately if not lazy
if !e.lazyMode {
e.produceBlock(ctx)
timer.Reset(e.blockTime)
}
}
}
}
```
### Block Production
```go
func (e *Executor) produceBlock(ctx context.Context) error {
// 1. Check backpressure
if e.cache.PendingCount() >= e.maxPending {
return ErrTooManyPending
}
// 2. Get batch from sequencer
batch, err := e.sequencer.GetNextBatch(ctx)
// 3. Execute transactions
stateRoot, gasUsed, err := e.exec.ExecuteTxs(ctx, batch.Txs, ...)
// 4. Create header
header := &types.Header{
Height: lastState.LastBlockHeight + 1,
Time: time.Now().UnixNano(),
LastHeaderHash: lastState.LastHeaderHash,
DataHash: batch.Txs.Hash(),
AppHash: stateRoot,
ProposerAddress: e.proposer,
}
// 5. Sign header
signedHeader, err := e.signer.SignHeader(header)
// 6. Create data
data := &types.Data{Txs: batch.Txs}
// 7. Update state
newState := lastState.NextState(header, stateRoot)
e.lastState.Store(newState)
// 8. Broadcast to P2P
e.broadcaster.BroadcastHeader(ctx, signedHeader)
e.broadcaster.BroadcastData(ctx, data)
// 9. Queue for DA submission
e.submitter.AddPending(signedHeader, data)
return nil
}
```
## Syncer (`internal/syncing/syncer.go`)
Coordinates block synchronization from multiple sources.
### Workers
```go
func (s *Syncer) startSyncWorkers(ctx context.Context) {
go s.daWorkerLoop(ctx) // DA retrieval
go s.pendingWorkerLoop(ctx) // Pending events
go s.p2pWorkerLoop(ctx) // P2P blocks
}
```
### DA Worker
```go
func (s *Syncer) daWorkerLoop(ctx context.Context) {
for {
// Get next DA height to retrieve
height := s.daRetrieverHeight.Load()
// Retrieve blocks at this DA height
events, err := s.daRetriever.Retrieve(ctx, height)
// Send to processing channel
for _, event := range events {
s.heightInCh <- event
}
// Advance DA height
s.daRetrieverHeight.Add(1)
}
}
```
### P2P Worker
```go
func (s *Syncer) p2pWorkerLoop(ctx context.Context) {
for {
select {
case header := <-s.p2pHandler.HeaderCh():
s.p2pHandler.HandleHeader(header)
case data := <-s.p2pHandler.DataCh():
s.p2pHandler.HandleData(data)
case event := <-s.p2pHandler.EventCh():
// Complete header+data pair received
s.heightInCh <- event
}
}
}
```
### Process Loop
```go
func (s *Syncer) processLoop(ctx context.Context) {
for {
select {
case event := <-s.heightInCh:
if err := s.processHeightEvent(ctx, event); err != nil {
// Log error, continue
}
case <-ctx.Done():
return
}
}
}
func (s *Syncer) processHeightEvent(ctx context.Context, event DAHeightEvent) error {
// 1. Validate header signature
if err := s.verifyHeader(event.SignedHeader); err != nil {
return err
}
// 2. Validate data hash matches header
if event.SignedHeader.DataHash != event.Data.Hash() {
return ErrDataHashMismatch
}
// 3. Execute transactions
stateRoot, _, err := s.exec.ExecuteTxs(ctx, event.Data.Txs, ...)
// 4. Verify state root
if stateRoot != event.SignedHeader.AppHash {
return ErrStateRootMismatch
}
// 5. Update state
newState := s.lastState.NextState(event.SignedHeader.Header, stateRoot)
s.lastState.Store(newState)
// 6. Persist to store
s.store.SaveBlock(event.SignedHeader, event.Data, newState)
return nil
}
```
## Submitter (`internal/submitting/submitter.go`)
Manages DA submission with retries and inclusion tracking.
### Two Loops
```go
func (s *Submitter) Start(ctx context.Context) error {
go s.daSubmissionLoop(ctx) // Submit to DA
go s.inclusionProcessingLoop(ctx) // Track inclusion
return nil
}
```
### DA Submission Loop
```go
func (s *Submitter) daSubmissionLoop(ctx context.Context) {
for {
// Get pending headers
headers := s.cache.GetPendingHeaders()
if len(headers) > 0 {
if err := s.submitHeaders(ctx, headers); err != nil {
s.handleSubmitError(err)
continue
}
}
// Get pending data
data := s.cache.GetPendingData()
if len(data) > 0 {
if err := s.submitData(ctx, data); err != nil {
s.handleSubmitError(err)
continue
}
}
time.Sleep(s.submitInterval)
}
}
```
### Retry Policy
```go
type DASubmitter struct {
maxRetries int
initialBackoff time.Duration
maxBackoff time.Duration
}
func (d *DASubmitter) Submit(ctx context.Context, blob []byte) error {
backoff := d.initialBackoff
for attempt := 0; attempt < d.maxRetries; attempt++ {
status, err := d.client.Submit(ctx, blob)
switch status {
case StatusSuccess:
return nil
case StatusTooBig:
return d.splitAndSubmit(ctx, blob)
case StatusAlreadyInMempool:
return nil // Already submitted
case StatusNotIncludedInBlock:
time.Sleep(backoff)
backoff = min(backoff*2, d.maxBackoff)
continue
default:
return err
}
}
return ErrMaxRetriesExceeded
}
```
## Forced Inclusion (`internal/da/forced_inclusion_retriever.go`)
Prevents sequencer censorship.
### Grace Period Calculation
```go
func (r *ForcedInclusionRetriever) calculateGracePeriod() uint64 {
// Base period: 1 epoch
basePeriod := r.epochLength
// Adjust based on block fullness
// Higher fullness = longer grace period (congestion tolerance)
ema := r.blockFullnessEMA.Load()
if ema > 0.8 {
// High congestion, extend grace period
return basePeriod * 2
}
return basePeriod
}
```
### Pending TX Tracking
```go
type PendingForcedTx struct {
Tx types.Tx
DAHeight uint64 // When tx appeared in DA
GraceDeadline uint64 // DA height deadline for inclusion
}
func (r *ForcedInclusionRetriever) checkPending(currentDAHeight uint64) {
for _, pending := range r.pendingTxs {
if currentDAHeight > pending.GraceDeadline {
// Sequencer failed to include tx
r.markSequencerMalicious(pending)
// Force include the tx
r.forceInclude(pending.Tx)
}
}
}
```
## Cache Manager (`internal/cache/manager.go`)
Unified cache for headers, data, and transactions.
### Structure
```go
type Manager struct {
headerCache *GenericCache[types.Hash, HeaderEntry]
dataCache *GenericCache[types.Hash, DataEntry]
txCache *GenericCache[types.Hash, TxEntry]
pendingEvents map[uint64]*DAHeightEvent
cleanupTicker *time.Ticker
retentionTime time.Duration
}
```
### Key Operations
```go
// Header tracking
func (m *Manager) IsHeaderSeen(hash types.Hash) bool
func (m *Manager) SetHeaderSeen(hash types.Hash, height uint64)
func (m *Manager) GetHeaderDAIncluded(hash types.Hash) (uint64, bool)
func (m *Manager) SetHeaderDAIncluded(hash types.Hash, daHeight uint64)
// Transaction deduplication
func (m *Manager) IsTxSeen(hash types.Hash) bool
func (m *Manager) SetTxSeen(hash types.Hash)
// Pending management
func (m *Manager) GetPendingHeaders() []*types.SignedHeader
func (m *Manager) GetPendingData() []*types.Data
func (m *Manager) PendingCount() uint64
```
### Disk Persistence
```go
func (m *Manager) SaveToDisk(path string) error {
state := &CacheState{
Headers: m.headerCache.Entries(),
Data: m.dataCache.Entries(),
Pending: m.pendingEvents,
}
return json.WriteFile(path, state)
}
func (m *Manager) LoadFromDisk(path string) error {
state, err := json.ReadFile(path)
// Restore caches from state
}
```
## Replayer (`internal/common/replay.go`)
Syncs execution layer after crash.
```go
func (r *Replayer) Replay(ctx context.Context) error {
// Get heights
storeHeight := r.store.GetLastHeight()
execHeight := r.exec.GetHeight()
if execHeight >= storeHeight {
return nil // Already synced
}
// Replay missing blocks
for height := execHeight + 1; height <= storeHeight; height++ {
header, data, err := r.store.GetBlock(height)
if err != nil {
return err
}
_, _, err = r.exec.ExecuteTxs(ctx, data.Txs, ...)
if err != nil {
return err
}
}
return nil
}
```
## Metrics (`internal/common/metrics.go`)
```go
var (
Height = prometheus.NewGauge(...)
NumTxs = prometheus.NewGauge(...)
BlockSizeBytes = prometheus.NewHistogram(...)
CommittedHeight = prometheus.NewGauge(...)
TxsPerBlock = prometheus.NewHistogram(...)
OperationDuration = prometheus.NewHistogramVec(...)
// DA metrics
DASubmitterFailures = prometheus.NewCounterVec(...)
DASubmitterLastFailure = prometheus.NewGauge(...)
DASubmitterPendingBlobs = prometheus.NewGauge(...)
DARetrievalAttempts = prometheus.NewCounter(...)
DARetrievalSuccesses = prometheus.NewCounter(...)
DARetrievalFailures = prometheus.NewCounter(...)
DAInclusionHeight = prometheus.NewGauge(...)
// Cache metrics
PendingHeadersCount = prometheus.NewGauge(...)
PendingDataCount = prometheus.NewGauge(...)
// Forced inclusion
ForcedInclusionTxsInGracePeriod = prometheus.NewGauge(...)
ForcedInclusionTxsMalicious = prometheus.NewCounter(...)
)
```
## Configuration
Key options in `BlockOptions`:
```go
type BlockOptions struct {
BlockTime time.Duration // Block interval
LazyBlockInterval time.Duration // Lazy mode timeout
MaxPendingHeadersAndData uint64 // Backpressure limit
BasedSequencer bool // No DA submissions
DABlockTime time.Duration // DA block interval
ScrapeInterval time.Duration // Tx reaping frequency
// Namespaces
HeaderNamespace []byte
DataNamespace []byte
ForcedInclusionNamespace []byte
}
```
## Error Types
```go
var (
ErrNoHeader = errors.New("no header found")
ErrNoData = errors.New("no data found")
ErrDataHashMismatch = errors.New("data hash does not match header")
ErrStateRootMismatch = errors.New("state root mismatch after execution")
ErrInvalidSignature = errors.New("invalid header signature")
ErrTooManyPending = errors.New("too many pending submissions")
ErrMaxRetriesExceeded = errors.New("max DA submission retries exceeded")
ErrSequencerMalicious = errors.New("sequencer failed to include forced tx")
)
```
## State Machines
### Executor State Machine
```
┌──────────────┐
│ IDLE │
└──────┬───────┘
│ BlockTime elapsed OR TxNotify
▼
┌──────────────┐
│ CHECK_PENDING│──── Too many? ───► Wait
└──────┬───────┘
│ OK
▼
┌──────────────┐
│ GET_BATCH │
└──────┬───────┘
│
▼
┌──────────────┐
│ EXECUTE_TXS │
└──────┬───────┘
│
▼
┌──────────────┐
│ CREATE_BLOCK │
└──────┬───────┘
│
▼
┌──────────────┐
│ BROADCAST │
└──────┬───────┘
│
▼
┌──────────────┐
│ QUEUE_SUBMIT │───► Back to IDLE
└──────────────┘
```
### Syncer State Machine
```
┌─────────────────────────────────────────┐
│ START │
└──────────────────┬──────────────────────┘
│
┌──────────────┼──────────────┐
│ │ │
▼ ▼ ▼
┌───────┐ ┌───────┐ ┌───────────┐
│ DA │ │ P2P │ │ FORCED │
│WORKER │ │WORKER │ │ INCLUSION │
└───┬───┘ └───┬───┘ └─────┬─────┘
│ │ │
└────────────┴──────────────┘
│
▼
┌──────────────┐
│ PROCESS_LOOP │
└──────┬───────┘
│
▼
┌──────────────┐
│ VALIDATE │──── Invalid? ───► Log, skip
└──────┬───────┘
│ Valid
▼
┌──────────────┐
│ EXECUTE │
└──────┬───────┘
│
▼
┌──────────────┐
│ UPDATE_STATE│
└──────┬───────┘
│
▼
┌──────────────┐
│ PERSIST │───► Back to PROCESS_LOOP
└──────────────┘
```
### Submitter State Machine
```
┌──────────────┐
│ START │
└──────┬───────┘
│
├─────────────────────┐
│ │
▼ ▼
┌──────────────┐ ┌──────────────────┐
│ SUBMIT_LOOP │ │ INCLUSION_LOOP │
└──────┬───────┘ └────────┬─────────┘
│ │
▼ ▼
┌──────────────┐ ┌──────────────────┐
│ GET_PENDING │ │ CHECK_DA_HEIGHT │
└──────┬───────┘ └────────┬─────────┘
│ │
▼ │ Included?
┌──────────────┐ │
│ SUBMIT │ ▼
└──────┬───────┘ ┌──────────────────┐
│ │ RESET_STATE │
│ Failed? └────────┬─────────┘
▼ │
┌──────────────┐ │
│ RETRY │ │
│ (backoff) │ │
└──────┬───────┘ │
│ │
└─────────────────────┘
```
```
### da-sequencing.md
```markdown
# Data Availability & Sequencing Architecture
Deep dive into ev-node's DA layer and sequencing system.
## Data Availability Layer
### Overview
The DA layer abstracts blob storage and retrieval. ev-node uses Celestia as the primary DA implementation but the interface is pluggable.
### Directory Structure
```
pkg/da/
├── types/
│ ├── types.go # Core DA types (Blob, ID, Commitment, Proof)
│ ├── namespace.go # Namespace handling (29 bytes: version + ID)
│ └── errors.go # Error definitions
├── selector.go # Round-robin address selection
└── jsonrpc/ # Celestia JSON-RPC client
block/internal/da/
├── client.go # DA client wrapper
├── interface.go # Client + Verifier interfaces
├── forced_inclusion_retriever.go # Epoch-based forced tx retrieval
└── async_block_retriever.go # Background prefetching
```
### Core Types
```go
// Status codes for DA operations
const (
StatusSuccess
StatusNotFound
StatusNotIncludedInBlock
StatusAlreadyInMempool
StatusTooBig
StatusContextDeadline
StatusError
StatusIncorrectAccountSequence
StatusContextCanceled
StatusHeightFromFuture
)
// Blob primitives
type Blob = []byte // Data submitted to DA
type ID = []byte // Height + commitment to locate blob
type Commitment = []byte // Cryptographic commitment
type Proof = []byte // Inclusion proof
```
### Namespace Format
Namespaces are 29 bytes:
- **Version** (1 byte): Protocol version (max 255)
- **ID** (28 bytes): Namespace identifier
Version 0 rules:
- First 18 bytes of ID must be zero
- Leaves 10 bytes for user data
```go
func NewNamespaceV0(id []byte) (Namespace, error) {
if len(id) > 10 {
return Namespace{}, ErrInvalidNamespaceLength
}
ns := Namespace{Version: 0}
copy(ns.ID[28-len(id):], id) // Right-pad zeros
return ns, nil
}
```
### DA Client Interface
```go
type Client interface {
// Submit blobs to DA layer
Submit(ctx context.Context, data [][]byte, gasPrice float64,
namespace []byte, options []byte) ResultSubmit
// Retrieve all blobs at height for namespace
Retrieve(ctx context.Context, height uint64,
namespace []byte) ResultRetrieve
// Get specific blobs by ID
Get(ctx context.Context, ids []ID, namespace []byte) ([]Blob, error)
// Namespace accessors
GetHeaderNamespace() []byte
GetDataNamespace() []byte
GetForcedInclusionNamespace() []byte
HasForcedInclusionNamespace() bool
}
type Verifier interface {
GetProofs(ctx context.Context, ids []ID, namespace []byte) ([]Proof, error)
Validate(ctx context.Context, ids []ID, proofs []Proof,
namespace []byte) ([]bool, error)
}
type FullClient interface {
Client
Verifier
}
```
### Submit Flow
```go
func (c *Client) Submit(ctx, data, gasPrice, namespace, options) ResultSubmit {
// 1. Validate blob size
for _, blob := range data {
if len(blob) > DefaultMaxBlobSize {
return ResultSubmit{Code: StatusTooBig}
}
}
// 2. Create Celestia blobs with namespace
blobs := make([]*blob.Blob, len(data))
for i, d := range data {
blobs[i], _ = blob.NewBlobV0(namespace, d)
}
// 3. Submit via RPC
height, err := c.blobRPC.Submit(ctx, blobs, submitOptions)
// 4. Return result with IDs
return ResultSubmit{
Code: StatusSuccess,
Height: height,
IDs: createIDs(height, blobs),
}
}
```
### Retrieve Flow
```go
func (c *Client) Retrieve(ctx, height, namespace) ResultRetrieve {
// 1. Fetch all blobs at height
blobs, err := c.blobRPC.GetAll(ctx, height, []Namespace{namespace})
// 2. Handle errors
if errors.Is(err, ErrBlobNotFound) {
return ResultRetrieve{Code: StatusNotFound}
}
if errors.Is(err, ErrHeightFromFuture) {
return ResultRetrieve{Code: StatusHeightFromFuture}
}
// 3. Get timestamp from DA header
header, _ := c.headerRPC.GetByHeight(ctx, height)
// 4. Extract blob data
data := make([][]byte, len(blobs))
for i, b := range blobs {
data[i] = b.Data()
}
return ResultRetrieve{
Code: StatusSuccess,
Height: height,
Timestamp: header.Time().UnixNano(),
Data: data,
}
}
```
### Address Selection
For Cosmos SDK compatibility (preventing sequence mismatches):
```go
type RoundRobinSelector struct {
addresses []string
counter atomic.Uint64
}
func (s *RoundRobinSelector) Next() string {
idx := s.counter.Add(1) % uint64(len(s.addresses))
return s.addresses[idx]
}
```
---
## Sequencing System
### Overview
Sequencers order transactions for block production. ev-node supports two modes:
- **Single Sequencer**: Hybrid (mempool + forced inclusion)
- **Based Sequencer**: Pure DA (only forced inclusion)
### Directory Structure
```
core/sequencer/
├── sequencing.go # Core interface
└── dummy.go # Test implementation
pkg/sequencers/
├── single/
│ ├── sequencer.go # Hybrid sequencer
│ └── queue.go # Persistent batch queue
├── based/
│ └── sequencer.go # Pure DA sequencer
└── common/
└── checkpoint.go # Shared checkpoint logic
```
### Core Interface
```go
type Sequencer interface {
// Submit transactions from reaper to sequencer
SubmitBatchTxs(ctx, req SubmitBatchTxsRequest) (*SubmitBatchTxsResponse, error)
// Get next batch for block production
GetNextBatch(ctx, req GetNextBatchRequest) (*GetNextBatchResponse, error)
// Verify batch was included in DA
VerifyBatch(ctx, req VerifyBatchRequest) (*VerifyBatchResponse, error)
// DA height tracking for forced inclusion
SetDAHeight(height uint64)
GetDAHeight() uint64
}
```
### Batch Structure
```go
type Batch struct {
Transactions [][]byte
// ForceIncludedMask[i] == true: From DA (MUST validate)
// ForceIncludedMask[i] == false: From mempool (already validated)
// nil: Backward compatibility (validate all)
ForceIncludedMask []bool
}
```
### Single Sequencer (Hybrid)
Accepts both mempool transactions and forced inclusion from DA.
**Components:**
1. **BatchQueue** - Persistent mempool storage
```go
type BatchQueue struct {
db DB
maxSize uint64
nextSeq uint64 // Starts at 0x8000000000000000
}
func (q *BatchQueue) AddBatch(batch [][]byte) error
func (q *BatchQueue) Next() ([][]byte, error)
func (q *BatchQueue) Prepend(batch [][]byte) error // Return unused txs
```
2. **Checkpoint** - Tracks position in DA epoch
```go
type Checkpoint struct {
DAHeight uint64 // Current DA height being processed
TxIndex uint64 // Position within epoch
}
```
**GetNextBatch Flow:**
```go
func (s *SingleSequencer) GetNextBatch(ctx, req) (*Response, error) {
// 1. Check if need next DA epoch
if s.checkpoint.DAHeight > 0 && len(s.cachedForcedTxs) == 0 {
s.fetchNextDAEpoch(ctx)
}
// 2. Process forced txs from checkpoint
forcedTxs, forcedBytes := s.processForcedTxs(req.MaxBytes)
// 3. Get mempool txs (remaining space)
mempoolTxs := s.queue.Next()
mempoolTxs = truncateToSize(mempoolTxs, req.MaxBytes - forcedBytes)
// 4. Return unused mempool txs to queue
s.queue.Prepend(unusedTxs)
// 5. Combine batches
batch := &Batch{
Transactions: append(forcedTxs, mempoolTxs...),
ForceIncludedMask: makeMask(len(forcedTxs), len(mempoolTxs)),
}
// 6. Update and persist checkpoint
s.updateCheckpoint()
return &Response{Batch: batch, Timestamp: s.timestamp}
}
```
### Based Sequencer (Pure DA)
Only processes forced inclusion transactions. No mempool.
**Key Differences:**
```go
func (s *BasedSequencer) SubmitBatchTxs(ctx, req) (*Response, error) {
// No-op: Ignores mempool transactions
return &SubmitBatchTxsResponse{}, nil
}
func (s *BasedSequencer) GetNextBatch(ctx, req) (*Response, error) {
// Only returns forced inclusion txs
txs := s.fetchForcedInclusion(ctx)
// Timestamp spread: prevents duplicate timestamps
// timestamp = DAEpochEndTime - (remainingTxs * 1ms)
timestamp := s.calculateSpreadTimestamp()
return &Response{
Batch: &Batch{Transactions: txs},
Timestamp: timestamp,
}
}
func (s *BasedSequencer) VerifyBatch(ctx, req) (*Response, error) {
// Always true: All txs come from DA (already verified)
return &VerifyBatchResponse{Status: true}, nil
}
```
### Forced Inclusion Flow
```
User submits tx to DA forced-inclusion namespace
│
▼
DA stores tx at height H
│
▼
Sequencer detects epoch boundary
│
▼
ForcedInclusionRetriever.Retrieve(epochStart, epochEnd)
│
├── AsyncBlockRetriever checks cache
│ │
│ ├── Cache hit: Return cached block
│ │
│ └── Cache miss: Sync fetch from DA
│
▼
Return ForcedInclusionEvent{Txs, Timestamp}
│
▼
Sequencer caches txs, updates checkpoint
│
▼
GetNextBatch returns txs with ForceIncludedMask[i]=true
│
▼
Executor passes mask to execution layer
│
▼
Execution layer validates forced txs (skips mempool validation)
```
### Async Block Retriever
Background prefetching reduces latency:
```go
type AsyncBlockRetriever struct {
client DAClient
cache map[uint64]*Block // In-memory cache
currentHeight atomic.Uint64
prefetchSize uint64 // 2x epoch size
pollInterval time.Duration // DA block time
}
func (r *AsyncBlockRetriever) Start(ctx context.Context) {
go func() {
for {
select {
case <-ctx.Done():
return
case <-time.After(r.pollInterval):
r.prefetch()
}
}
}()
}
func (r *AsyncBlockRetriever) prefetch() {
current := r.currentHeight.Load()
end := current + r.prefetchSize
for h := current; h < end; h++ {
if _, exists := r.cache[h]; !exists {
block, _ := r.client.Retrieve(ctx, h, namespace)
r.cache[h] = block
}
}
// Cleanup old entries
r.cleanupBefore(current - r.prefetchSize)
}
```
---
## Integration with Block Package
### Executor Integration
```go
func (e *Executor) initializeState() error {
state := e.store.GetState()
// Sync sequencer DA height with stored state
e.sequencer.SetDAHeight(state.DAHeight)
return nil
}
func (e *Executor) produceBlock(ctx context.Context) error {
// 1. Get batch from sequencer
resp, _ := e.sequencer.GetNextBatch(ctx, GetNextBatchRequest{
Id: e.genesis.ChainID,
MaxBytes: DefaultMaxBlobSize,
})
// 2. Pass ForceIncludedMask to execution layer
ctx = WithForceIncludedMask(ctx, resp.Batch.ForceIncludedMask)
// 3. Execute transactions
stateRoot, _ := e.exec.ExecuteTxs(ctx, resp.Batch.Transactions, ...)
// 4. Update state with new DA height
newState := &State{
DAHeight: e.sequencer.GetDAHeight(),
// ...
}
// 5. Create and broadcast block
// ...
}
```
### Configuration
```go
type DAConfig struct {
Address string // Celestia RPC endpoint
AuthToken string // Auth token
Namespace string // Header namespace
DataNamespace string // Data namespace (optional)
ForcedInclusionNamespace string // Forced inclusion namespace
BlockTime Duration // DA block time
SubmitOptions string // JSON gas settings
SigningAddresses []string // Round-robin addresses
MaxSubmitAttempts int // Retry limit
RequestTimeout Duration // Per-request timeout
}
type NodeConfig struct {
Aggregator bool // Enable block production
BasedSequencer bool // Use based sequencer (requires Aggregator)
BlockTime Duration // App block time
LazyMode bool // Only produce on txs
}
```
### Genesis Configuration
```go
type Genesis struct {
DAStartHeight uint64 // First DA height (0 at genesis)
DAEpochForcedInclusion uint64 // Epoch size (default 50)
}
```
---
## Key Design Decisions
### 1. ForceIncludedMask Optimization
Distinguishes DA-sourced (untrusted) from mempool (trusted) transactions:
- Execution layer validates forced txs
- Skips redundant validation for mempool txs
- Significant performance improvement
### 2. Epoch-Based Processing
Only retrieves forced inclusion at epoch boundaries:
- Reduces DA queries
- Enables batching
- Checkpoint ensures resumable processing
### 3. Async Prefetching
Background goroutine prefetches 2x epoch size ahead:
- Reduces latency when sequencer needs txs
- Cache misses fall back to sync fetch
- Bounded memory via cleanup
### 4. Namespace Strategy
Three separate namespaces:
- **Header**: Block headers (required)
- **Data**: Transaction data (optional, can share with header)
- **Forced Inclusion**: User-submitted txs for censorship resistance
### 5. Crash Recovery
Both sequencers persist state:
- **Checkpoint**: DAHeight + TxIndex position
- **Queue**: Pending mempool batches
- Protobuf serialization to DB
### 6. Single vs Based Mode
| Aspect | Single | Based |
|--------|--------|-------|
| Mempool | Yes | No |
| Forced Inclusion | Yes | Yes (only source) |
| SubmitBatchTxs | Stores in queue | No-op |
| VerifyBatch | Validates proofs | Always true |
| Use Case | Traditional rollup | High liveness |
```