Apache Iggy
SDKNode

Examples

Working examples are available in the examples/node directory, written in TypeScript. The following example sets are included:

  • getting-started - basic producer and consumer
  • basic - producer and consumer with utilities
  • message-envelope - JSON message envelope pattern
  • message-headers - custom message headers
  • multi-tenant - multi-tenant streaming setup
  • tcp-tls - TLS-encrypted TCP connections
  • stream-builder - stream builder API usage
  • sink-data-producer - bulk data generation for sink connectors

Producer

import { Client, Partitioning } from 'apache-iggy';

const client = new Client({
  transport: 'TCP',
  options: { port: 8090, host: '127.0.0.1', keepAlive: true },
  credentials: { username: 'iggy', password: 'iggy' },
});

const stream = await client.stream.create({ name: 'sample-stream' });
const topic = await client.topic.create({
  streamId: stream.id,
  name: 'sample-topic',
  partitionCount: 1,
  compressionAlgorithm: 1,
  replicationFactor: 1,
});

const messages = Array.from({ length: 10 }, (_, i) => ({
  id: i + 1,
  headers: [],
  payload: `message-${i + 1}`,
}));

await client.message.send({
  streamId: stream.id,
  topicId: topic.id,
  messages,
  partition: Partitioning.PartitionId(
    topic.partitions[0].id
  ),
});

await client.destroy();

Consumer

import { Client, PollingStrategy, Consumer } from 'apache-iggy';

const STREAM_ID = 1;
const TOPIC_ID = 1;
const PARTITION_ID = 0;

const client = new Client({
  transport: 'TCP',
  options: { port: 8090, host: '127.0.0.1' },
  credentials: { username: 'iggy', password: 'iggy' },
});

const polledMessages = await client.message.poll({
  streamId: STREAM_ID,
  topicId: TOPIC_ID,
  consumer: Consumer.Single,
  partitionId: PARTITION_ID,
  pollingStrategy: PollingStrategy.Offset(BigInt(0)),
  count: 10,
  autocommit: false,
});

for (const message of polledMessages.messages) {
  const payload = message.payload.toString('utf8');
  console.log(
    `Offset: ${message.headers.offset}, Payload: ${payload}`
  );
}

await client.destroy();

For the full source code, see the examples/node directory.

On this page