Apache Iggy
SDKPython

Examples

Working examples are available in the examples/python directory.

  • basic - producer and consumer using connection strings
  • getting-started - producer and consumer with TLS support

Producer

import asyncio
from apache_iggy import IggyClient
from apache_iggy import SendMessage as Message

STREAM_NAME = "sample-stream"
TOPIC_NAME = "sample-topic"
PARTITION_ID = 0

async def main():
    client = IggyClient.from_connection_string(
        "iggy+tcp://iggy:iggy@127.0.0.1:8090"
    )
    await client.connect()

    await client.create_stream(name=STREAM_NAME)
    await client.create_topic(
        stream=STREAM_NAME,
        partitions_count=1,
        name=TOPIC_NAME,
        replication_factor=1,
    )

    messages = []
    for i in range(10):
        messages.append(Message(f"message-{i}"))

    await client.send_messages(
        stream=STREAM_NAME,
        topic=TOPIC_NAME,
        partitioning=PARTITION_ID,
        messages=messages,
    )

asyncio.run(main())

Consumer

import asyncio
from apache_iggy import IggyClient, PollingStrategy, ReceiveMessage

STREAM_NAME = "sample-stream"
TOPIC_NAME = "sample-topic"
PARTITION_ID = 0

async def main():
    client = IggyClient.from_connection_string(
        "iggy+tcp://iggy:iggy@127.0.0.1:8090"
    )
    await client.connect()

    polled_messages = await client.poll_messages(
        stream=STREAM_NAME,
        topic=TOPIC_NAME,
        partition_id=PARTITION_ID,
        polling_strategy=PollingStrategy.Next(),
        count=10,
        auto_commit=True,
    )

    for message in polled_messages:
        payload = message.payload().decode("utf-8")
        print(f"Offset: {message.offset()}, Payload: {payload}")

asyncio.run(main())

For the full source code, see the examples/python directory.

On this page