Version 3.0

Kafka » Overview

ROM supports Apache Kafka via rom-kafka adapter, that is built on top of the poseidon ruby driver.

Before v0.1.0 the adapter is still in alpha. If you find any inconsistency, please feel free to ask your questions at the ROM chatroom and report issues on github.


The adapter provides access to Kafka brokers in much the same way as other adapters do for corresponding datastores with some specifics:

  • By the very nature of Kafka, it allows only creating (publishing) messages, and reading (consuming) them. No 'update' and 'delete' commands are available.

  • Reading messages from Kafka also differs from what you'd expect from a database. Kafka only supports reading a sequence of messages from a topic's partition, starting from some offset . You can neither reorder messages or filter them in any way. That operations are up to domain application. All you can define is the topic (relations), its partition, initial offset, and limit for number of messages to output.


Set a Kafka gateway in a ROM generic way. When setting a gateway you have to specify the client_id and a list of Kafka brokers. Brokers can be set in the following ways:

# by default (connects to host 'localhost', port 9092), client_id: :admin)

# with inline address (host:port), 'localhost:9092', client_id: :admin)

# ...or a list of addresses, '', '', client_id: :admin)

# with explicit array of `hosts` and `port` :kafka,
  hosts:     ['', ''],
  port:      9092,
  client_id: :admin

# or their combination (the same as '', '') :kafka, '',
  hosts:     [''],
  port:      9092,
  client_id: :admin

Additional options

In addition to brokers and client_id you can use the following options:

Attribute Type Default value Description
:partitioner Proc, nil nil A proc used to define partition by key.
:compression_codec :gzip, :snappy, nil nil The type of compression to be used.
:metadata_refresh_interval_ms Integer 600_000 How frequently the topic metadata should be updated (in milliseconds).
:max_send_retries Integer 3 The number of times to retry sending of messages to a Kafka leader.
:retry_backoff_ms Integer 100 The amount of time (in milliseconds) to wait before refreshing the metadata after we are unable to send messages.
:required_acks Integer 0 The number of acks required per request.
:ack_timeout_ms Integer 1_500 How long the producer waits for acks.
:socket_timeout_ms Integer 10_000 How long the producer/consumer socket waits for any reply from server.
:min_bytes Integer 1 The smallest amount of data the server should send (By default send data as soon as it is ready).
:max_bytes Integer 1_048_576 The maximum number of bytes to fetch by consumer (1MB by default).
:max_wait_ms Integer 100 How long to block until the server sends data. This is only enforced if min_bytes is > 0.


With the :partitioner option you can specify a procedure to define a partition by key. The procedure should take 2 arguments for key and number of partitions, and return the integer value for a partition.

In the following example a message is added to a corresponding partition depending on number of letters in a key:

rom = :kafka, '',
  client_id:   :admin,
  partitioner: -> key, number { key.count % number }

insert = ROM.command(:items).create

# Suppose the topic "items" has 3 partitions (0 and 1).
# Messages "bar" and "baz" will be added to the partition 1 ("foo".count % 2 = 1).
insert.with(key: "foo").call "bar", "baz"


To use snappy compression, install the snappy gem, or simply add gem 'snappy' to your project's Gemfile.


In ROM::Kafka the relation describes a topic. You can read messages from a specific partition from a specified offset. By default both the partition and initial offset are set to 0.

To define a Kafka relation follow the standard way of defining relations in ROM.

class Greetings < ROM::Relation[:kafka]
  topic :greetings # kafka-specific alias for `relation :greetings`

To define relations that are exposed to you application you can define your own methods using dataset modifiers:

  • #from to define a partition to read data from (0 by default).
  • #offset to define a starting offset to start reading from (0 by default).
  • #limit to define a number of messages to be fetched.
  • #using to modify any option of the setup.

The relation call method returns an array of tuples with 4 keys:

  • value for the message.
  • topic for the current topic.
  • key for the current key.
  • offset for the offset of the current message.
# After the setup
rom =, '', client_id: :admin)

greetings = rom.relation(:greetings)

# Selects all messages from the (default) partition 0
# => [
#      { value: "Hi!", topic: "greetings", key: nil, offset: 0 },
#      { value: "Hello!", topic: "greetings", key: nil, offset: 1 }
#    ]


By default messages are read from 0 partition. You can explicitly select the partition to read from:

# Will read all messages from the partition 1 of the "greetings" topic

Using options

Kafka allows reading messages from given offset. Messages are fetched by chunks - you can set a maximum and minimum length (in bytes), as well as the wait time for the server to responce.

This options can be set for a gateway during the setup phase:

rom = :kafka,
  client_id: :admin,
  min_bytes: 1_024,  # ignore data less then 1Kb
  max_bytes: 10_240, # read nor more than 10Kb at once
  max_wait_ms: 100   # wait for responce no longer than 100ms

or you can update them with using method:

# read all messages whatever length they have, and wait for the request up to second
greetings.from(0).using(min_bytes: 1, max_wait_ms: 1_000).call.to_a


When Kafka reads messages from topic/partition, it stops at some offset. This can be an offset of the last message (at the time of reading).

If in some period of time you'll make another call, it start reading messages from the next offset (only new ones).

greetings = rom.relation(:greetings)
# => [
#      { value: "Hi",    topic: "greetings", key: nil, offset: 0 },
#      { value: "Hello", topic: "greetings", key: nil, offset: 1 }
#    ]
# => [] (because all messages has bean read diring the first call)
# => [
#      { value: "Hola", topic: "greetings", key: nil, offset: 2 }
#    ]
# (only messages being added after the previous call)

If you need to restart reading from a specific offset, you can do it by setting offset explicitly:

# => [
#      { value: "Hello", topic: "greetings", key: nil, offset: 1 },
#    ]

You can use info from the last extracted tuple to define an offset, from which to start the next time.


You can define a maximum number of messages to return, using the limit method:

greetings = rom.relation(:greetings)
# => [
#      { value: "Hello", topic: "greetings", key: nil, offset: 1 },
#      { value: "Hola",  topic: "greetings", key: nil, offset: 2 }
#    ]

But be careful. Actual size of data being read is defined by :max_bytes settings, not the offset.

For example, when you set offset(2), the relation can actually fetch the chunk of 5 messages (and move the next offset correspodingly). If you continue reading, you'll miss 3 messages. That's why it is strongly recommended to set offset explicitly after using of limit modifier.

This is unsafe (can cause missing messages):

greetings = rom.relation(:greetings).limit(1)
# => [
#      { value: "Hi",    topic: "greetings", key: nil, offset: 0 },
#    ]
# => []

while this is pretty safe:

# => [
#      { value: "Hi",    topic: "greetings", key: nil, offset: 0 },
#    ]
# => [
#      { value: "Hello", topic: "greetings", key: nil, offset: 1 },
#      { value: "Hola",  topic: "greetings", key: nil, offset: 2 }
#    ]

Also notice, that every time you use modifier, the new connection is re-established. That's why the rule of thumb is either not using modifiers at all, or set the offset explicitly for every call.


Kafka supports the Create command only. You can only add immutable messages to the log, but not to change or delete them.

ROM::Kafka provides two helpers for command: #where and #using.

class Greet < ROM::Commands::Create[:kafka]
  relation :greetings
  register_as :greet

  def execute(name)
    super where(key: "foo").using(socket_timeout_ms: 10)

Here where modifier requires only one value for a :key. The using modifier accepts any value you'll get, but will ignore unused ones. You're recommended to use only those keys that are defined for the producer:

  • partitioner
  • compression_codec
  • metadatarefreshinterval_ms
  • maxsendretries
  • retrybackoffms
  • required_acks
  • acktimeoutms
  • sockettimeoutms

In case of success the command returns an array of messages added to Kafka:

# After the setup
rom =, '', client_id: :admin)
greet = rom.command(:greetings).greet "Hi, Joe", "How're you?"
# => [
#      { value: "Hi, Joe",     topic: "greetings", key: 0 },
#      { value: "How're you?", topic: "greetings", key: 0 }
#    ]

The producer and consumer connections to Kafka brokers are separated. A command doesn't read messages being written, and knows nothing about partitions and offsets that where assigned by the server. You have to read them explicitly if you need.


Mappers can be applied to relations and commands in a standard ROM way.