Payments

Payment Events

Real-time payment notifications via Server-Sent Events (SSE)

MoneyMQ provides a pub/sub event system over Server-Sent Events (SSE) that enables real-time communication between the payment backend and client applications.

Key Concepts

RoleDescriptionUse Case
ListenerSubscribe-only clientFrontend apps that only receive events
PaymentStreamTransaction listenerBackend apps that handle multiple concurrent transactions
PaymentHookSubscribe + attach clientBackend hooks spawned per-transaction

Event Types

Payment events are automatically published to channels when payments are processed:

EventDescription
payment:verifiedPayment signature verified on-chain
payment:settledPayment settled to recipient
payment:failedPayment verification or settlement failed

Hooks can attach fulfillment data that gets included in signed receipts.


Listener (Frontend)

Use a listener when your frontend needs to receive payment events in real-time.

import { MoneyMQ } from '@moneymq/sdk';

const moneymq = new MoneyMQ({
  endpoint: 'http://localhost:8488',
});

// Create a listener for a specific channel (e.g., transaction ID)
const listener = moneymq.payment.listener('tx-abc123');

// Stream events using async iterator
for await (const event of listener.events()) {
  if (event.type === 'payment:verified') {
    console.log('Payment verified:', event.data.payer);
  } else if (event.type === 'payment:settled') {
    console.log('Payment settled:', event.data.amount);
    showSuccessMessage('Payment received!');
  } else if (event.type === 'order:completed') {
    updateUI(event.data.trackingNumber);
  }
}
import { MoneyMQ } from '@moneymq/sdk';

const moneymq = new MoneyMQ({
  endpoint: 'http://localhost:8488',
});

// Create a listener for a specific channel (e.g., transaction ID)
const listener = moneymq.payment.listener('tx-abc123');

// Subscribe to payment events
listener.on('payment:verified', (event) => {
  console.log('Payment verified:', event.data.payer);
});

listener.on('payment:settled', (event) => {
  console.log('Payment settled:', event.data.amount);
  showSuccessMessage('Payment received!');
});

// Custom events from backend actor
listener.on('order:completed', (event) => {
  updateUI(event.data.trackingNumber);
});

// Connection lifecycle
listener.on('connected', () => console.log('Connected'));
listener.on('disconnected', () => console.log('Disconnected'));
listener.on('error', (err) => console.error(err));

// Connect to start receiving events
listener.connect();

// Disconnect when done (e.g., on component unmount)
listener.disconnect();

Replay Events

Replay recent events when connecting:

// Replay last 10 events on connect
const listener = moneymq.payment.listener('tx-abc123', {
  replay: 10,
});

// Works with both async iterator and callbacks
for await (const event of listener.events()) {
  // ...
}

PaymentStream (Backend)

Use a payment stream when your backend handles multiple concurrent transactions. The stream listens for all new transactions and spawns hooks for each one.

import { MoneyMQ } from '@moneymq/sdk';

const moneymq = new MoneyMQ({
  endpoint: 'http://localhost:8488',
  secret: process.env.MONEYMQ_SECRET,
});

// Create a payment stream (listens for all new transactions)
const stream = moneymq.payment.paymentStream();

// Stream transactions using async iterator
for await (const tx of stream.transactions()) {
  console.log('New transaction:', tx.id, tx.payment?.amount, tx.basket[0]?.productId);

  // Stream events for this transaction
  for await (const event of tx.events()) {
    if (event.type === 'payment:settled') {
      // Process the payment
      const order = await db.orders.update(tx.id, { status: 'paid' });
      const shipment = await shipOrder(order);

      // Attach fulfillment data - server creates signed receipt
      await tx.attach('fulfillment', {
        orderId: order.id,
        trackingNumber: shipment.tracking,
        estimatedDelivery: shipment.eta,
      });
      break; // Done with this transaction
    }
  }
}
import { MoneyMQ } from '@moneymq/sdk';

const moneymq = new MoneyMQ({
  endpoint: 'http://localhost:8488',
  secret: process.env.MONEYMQ_SECRET,
});

// Create a payment stream (listens for all new transactions)
const stream = moneymq.payment.paymentStream();

// Called for each new transaction
stream.on('transaction', (tx) => {
  console.log('New transaction:', tx.id, tx.payment?.amount, tx.basket[0]?.productId);

  // Get a hook scoped to this transaction's channel
  const hook = tx.hook();

  hook.on('payment:settled', async (event) => {
    // Process the payment
    const order = await db.orders.update(tx.id, { status: 'paid' });
    const shipment = await shipOrder(order);

    // Attach fulfillment data - server creates signed receipt
    await hook.attach('fulfillment', {
      orderId: order.id,
      trackingNumber: shipment.tracking,
      estimatedDelivery: shipment.eta,
    });
  });
});

stream.on('connected', () => console.log('PaymentStream connected'));
stream.on('error', (err) => console.error('PaymentStream error:', err));

stream.connect();

Transaction Methods

Transactions have built-in methods for receiving events and attaching fulfillment data:

Receiving Events

for await (const tx of stream.transactions()) {
  for await (const event of tx.events()) {
    if (event.type === 'payment:settled') {
      // Handle payment...
    }
  }
}

Attaching Data

Attach fulfillment data to the transaction. The server creates a signed receipt JWT and emits a transaction:completed event to all listeners:

// Attach order fulfillment data with a key
await tx.attach('fulfillment', {
  orderId: order.id,
  trackingNumber: shipment.tracking,
  estimatedDelivery: shipment.eta,
});

Advanced: Manual Hook Control

For fine-grained control, you can create a hook manually:

const hook = tx.hook({ replay: 10 }); // Custom options
for await (const event of hook.events()) {
  // ...
}

Event Data Structure

Events follow a consistent envelope format:

{
  "id": "550e8400-e29b-41d4-a716-446655440000",
  "type": "payment:settled",
  "data": {
    "payer": "7xKXtg2CW87d97TXJSDpbD5jBkheTqA83TZRuJosgAsU",
    "amount": "1000000",
    "network": "Solana",
    "transactionSignature": "5K9...",
    "productId": "prod_xxxxx"
  },
  "time": "2024-01-15T10:30:00Z"
}

Channel IDs

Channels are identified by a unique transaction ID. Each payment transaction gets its own channel, allowing frontend and backend to communicate about that specific transaction.

// Subscribe to events for a specific transaction
const listener = moneymq.payment.listener('tx-abc123');

The transaction ID is automatically generated by the payment middleware and included in the payment:verified and payment:settled events.


Wildcard Listeners

Listen to all events on a channel:

listener.on('*', (event) => {
  console.log('Any event:', event.type, event.data);
});

Stateful vs Stateless Streams

MoneyMQ supports two streaming modes:

ModeDescriptionUse Case
StatefulServer tracks your position via streamIdProduction apps, guaranteed delivery
StatelessClient uses replay for recent eventsSimple integrations, debugging

With stateful streams, the server remembers your position. If you disconnect, missed events are automatically replayed when you reconnect.

const listener = moneymq.payment.listener('tx-abc123', {
  streamId: 'checkout-widget-user-123', // Unique stream ID for cursor tracking
});

// Events are automatically replayed on reconnect
for await (const event of listener.events()) {
  if (event.type === 'payment:settled') {
    console.log('Payment:', event.data.amount);
  }
}

Stream ID Best Practices

Use a deterministic, unique ID for each consumer:

// Per-user widget
streamId: `checkout-widget-${userId}`

// Per-session dashboard
streamId: `dashboard-${sessionId}`

// Shared backend worker
streamId: `order-processor-worker-1`

Stateless Streams

For simpler integrations, use stateless streams where you manage recovery:

const listener = moneymq.payment.listener('tx-abc123', {
  replay: 10, // Replay last 10 events on connect
});

for await (const event of listener.events()) {
  if (event.type === 'payment:settled') {
    // Store cursor locally for manual recovery
    localStorage.setItem('lastEventId', event.id);
  }
}

Connection Options

OptionDefaultDescription
streamId-Stream ID for server-side cursor tracking (stateful)
replay0Replay last N events on connect (stateless)
autoReconnecttrueAuto-reconnect on disconnect
reconnectDelay1000Delay between reconnect attempts (ms)
maxReconnectAttempts0Max reconnect attempts (0 = infinite)
const listener = moneymq.payment.listener('tx-123', {
  streamId: 'my-unique-consumer-id', // Stateful mode
  replay: 10,                         // Initial replay on first connect
  autoReconnect: true,
  reconnectDelay: 2000,
  maxReconnectAttempts: 5,
});

// Use with async iterator
for await (const event of listener.events()) {
  // ...
}

Cancellation

Cancel a stream using an AbortController:

const controller = new AbortController();

// Cancel the stream after 30 seconds
setTimeout(() => controller.abort(), 30000);

for await (const event of listener.events({ signal: controller.signal })) {
  // ...
}

REST API Endpoints

Channel SSE Stream

GET /payment/v1/channels/{channelId}
  Query params:
    - replay: number (optional) - replay last N events
    - token: string (optional) - auth token for actors

  Response: SSE stream

Publish Event

POST /payment/v1/channels/{channelId}/events
  Headers:
    - Authorization: Bearer {secret}
    - Content-Type: application/json

  Body:
    {
      "type": "order:completed",
      "data": { "orderId": "...", "trackingNumber": "..." }
    }

  Response: 201 Created

Transactions Stream

GET /payment/v1/channels/transactions
  Query params:
    - token: string (required) - auth token

  Response: SSE stream of transaction notifications

Error Handling

Use the callback API to handle errors separately from the event stream:

const listener = moneymq.payment.listener('tx-abc123');

// Handle errors separately
listener.on('error', (error) => {
  if (error.code === 'UNAUTHORIZED') {
    // Invalid or missing secret
  } else if (error.code === 'CONNECTION_LOST') {
    // Network error, will auto-reconnect if enabled
  } else if (error.code === 'PARSE_ERROR') {
    // Failed to parse event
  }
});

// Stream events (errors handled above)
for await (const event of listener.events()) {
  // Process events...
}

Next Steps

On this page