Skip to content

Messaging Protocol

The messaging protocol provides request-response semantics for targeted peer-to-peer communication. Unlike GossipSub which broadcasts to all peers, messaging sends a request to a specific peer and waits for a response.

Protocol ID: /xe/msg/1.0.0

Wire Types

MsgRequest

type MsgRequest struct {
    Type    string          `json:"type"`
    Payload json.RawMessage `json:"payload"`
}

MsgResponse

type MsgResponse struct {
    Type    string          `json:"type"`
    Error   string          `json:"error,omitempty"`
    Payload json.RawMessage `json:"payload"`
}

MsgHandler

type MsgHandler func(from peer.ID, payload json.RawMessage) (json.RawMessage, error)

Handlers receive the sender's peer ID and the raw JSON payload. They return a raw JSON response or an error.

Constants

Constant Value Description
MsgProtocol /xe/msg/1.0.0 Stream protocol identifier
MsgStreamDeadline 30 seconds Read/write timeout per stream
MsgMaxRequestSize 64 KB (65,536 bytes) Maximum incoming request size
MsgMaxResponseSize 64 KB (65,536 bytes) Maximum incoming response size

Messenger

The Messenger struct manages handler registration and request dispatching:

type Messenger struct {
    host     host.Host
    dht      *dht.IpfsDHT
    handlers map[string]MsgHandler
    mu       sync.RWMutex
}

Construction

func NewMessenger(h host.Host, d *dht.IpfsDHT) *Messenger

NewMessenger creates a Messenger and registers the /xe/msg/1.0.0 stream handler on the host. The DHT is used for peer discovery when the target peer is not in the peerstore.

Registering Handlers

func (m *Messenger) Handle(msgType string, h MsgHandler)

Registers a handler for a specific message type. Handlers are stored in a thread-safe map and dispatched by the Type field of incoming requests.

Note

If a request arrives with an unregistered message type, the Messenger responds with an error: "unknown message type: {type}".

Sending Requests

func (m *Messenger) Request(ctx context.Context, target peer.ID, msgType string, payload any) (json.RawMessage, error)

Sends a typed request to a specific peer and waits for the response. The flow:

  1. Marshal payload -- The payload (any type) is JSON-marshaled into json.RawMessage
  2. Find peer -- If the target is not in the peerstore and a DHT is available, FindPeer() is called to locate the peer and Connect() establishes a connection
  3. Open stream -- A new stream is opened to the target on protocol /xe/msg/1.0.0
  4. Set deadline -- Uses the context deadline if set, otherwise defaults to 30 seconds
  5. Send request -- JSON-encodes the MsgRequest and calls CloseWrite() to signal completion
  6. Read response -- JSON-decodes the MsgResponse from a size-limited reader
  7. Check error -- If the response contains an Error field, returns it as a Go error

Request-Response Flow

Client                                    Server
──────                                    ──────
  │                                          │
  │  Request(ctx, target, type, payload)     │
  │                                          │
  │  1. Find peer via DHT (if needed)        │
  │  2. Connect (if needed)                  │
  │  3. Open stream                          │
  │                                          │
  │  MsgRequest{Type, Payload} ────────────▶ │
  │  CloseWrite() ─────────────────────────▶ │
  │                                          │  Lookup handler by Type
  │                                          │  Call handler(from, payload)
  │                                          │
  │  ◀──────────── MsgResponse{Type, Payload}│
  │                                          │
  │  Close stream                            │
  │                                          │

Peer Discovery via DHT

When Request() is called for a peer not in the peerstore, the Messenger uses the Kademlia DHT to find the peer:

if len(m.host.Peerstore().Addrs(target)) == 0 && m.dht != nil {
    pi, err := m.dht.FindPeer(ctx, target)
    // ... connect to discovered peer
}

This makes messaging work even when the sender has never directly connected to the recipient, as long as both are part of the DHT.

Message Types

The messaging protocol is generic -- the Type field determines which handler processes the request. The following message types are registered as point-to-point handlers:

Type Direction Purpose
vm_credentials Provider → Consumer Delivers VM SSH connection details after provisioning
vm_status Consumer → Provider Queries current VM status for a lease
account_chat Any → Any Sends a chat message between accounts
attest_timestamp Provider → Timekeeper Requests a signed timestamp attestation for a lease

Gossip vs messaging

Block propagation, votes, marketplace negotiations, directory updates, and state chain blocks all use GossipSub (pubsub topics), not the point-to-point messaging protocol. See Gossip for those message types.

Payload Types

ResourceAdvertisement

type ResourceAdvertisement struct {
    Provider  string `json:"provider"`
    VCPUs     uint64 `json:"vcpus"`
    MemoryMB  uint64 `json:"memory_mb"`
    DiskGB    uint64 `json:"disk_gb"`
    Timestamp int64  `json:"timestamp"`
    Signature string `json:"signature"`
}

ResourceRequest

type ResourceRequest struct {
    Consumer  string `json:"consumer"`
    RequestID string `json:"request_id"`
    VCPUs     uint64 `json:"vcpus"`
    MemoryMB  uint64 `json:"memory_mb"`
    DiskGB    uint64 `json:"disk_gb"`
    Duration  uint64 `json:"duration"`  // seconds
    Timestamp int64  `json:"timestamp"`
    Signature string `json:"signature"`
}

ResourceOffer

type ResourceOffer struct {
    Provider  string `json:"provider"`
    RequestID string `json:"request_id"`
    VCPUs     uint64 `json:"vcpus"`
    MemoryMB  uint64 `json:"memory_mb"`
    DiskGB    uint64 `json:"disk_gb"`
    Duration  uint64 `json:"duration"`
    TotalCost uint64 `json:"total_cost"`
    Timestamp int64  `json:"timestamp"`
    Signature string `json:"signature"`
}

StateChainSyncRequest / Response

type StateChainSyncRequest struct {
    TipIndex int64 `json:"tip_index"` // -1 if empty chain
}

type StateChainSyncResponse struct {
    Blocks  []*statechain.Block `json:"blocks"`
    HasMore bool                `json:"has_more"`
}

MarketplaceMsg

type MarketplaceMsg struct {
    Type    string                 `json:"type"` // "advertisement", "request", "offer"
    Ad      *ResourceAdvertisement `json:"ad,omitempty"`
    Request *ResourceRequest       `json:"request,omitempty"`
    Offer   *ResourceOffer         `json:"offer,omitempty"`
}

Error Handling

Errors can occur at multiple levels:

Level Handling
Peer not found (DHT) Request() returns error
Connection failure Request() returns error
Stream open failure Request() returns error
Timeout (30s deadline) Stream read/write fails
Unknown message type Server responds with error in MsgResponse.Error
Handler returns error Server responds with error in MsgResponse.Error
Decode failure Server logs error, no response sent

Comparison with GossipSub

Messaging GossipSub
Target Specific peer All peers
Pattern Request-response Publish-subscribe
Delivery Reliable (or error) Best-effort
Use case Marketplace negotiation, targeted sync Block/vote broadcast
Size limit 64 KB 256 KB
Timeout 30 seconds None (async)

Use messaging when you need a response from a specific peer. Use gossip when you need to broadcast to the entire network.