Erlang kafka driver based on librdkafka
The library is implemented in top of librdkafka
which is a C library implementation of the Apache Kafka protocol
designed with message delivery reliability and high performance in mind, current figures exceed 1 million msgs/second
for the producer and 3 million msgs/second for the consumer.
It's well known that NIF's can affect the Erlang schedulers performances in case the functions are not returning in less than 1-2 ms and blocks the scheduler threads.
Because the librdkafka
driver is async, erlkaf won't block the scheduler threads and all calls to the native functions
will return immediately. The librdkafka
driver use it's own thread pool for managing the requests. Also each client
has it's own thread from where is sending async the events (delivery reports, logs, statistics) to erlang
using enif_send
.
Version 2.0.0 contains a major rewrite of the consumer groups in order to be able to scale when you have lot of topics:
-
In version 1.x each consumer group or producer is creating one dedicated native thread that's used to deliver the callbacks. Starting 2.0.0 no matter how many producers or consumer groups you have, one single thread will be used to deliver the callbacks.
-
In version 1.x each consumer group can handle a list of topics but you cannot specify a callback for each topic. Now the callbacks settings moved into the topics list. This breaks the backward compatibility for the API and config. More details into Changelog.
On Ubuntu make sure you have installed :
sudo apt-get install libsasl2-dev liblz4-dev libzstd-dev
Add erlkaf
as a dependency to your project. The library works with rebar
, rebar3
or hex.pm
{deps, [
{erlkaf, ".*", {git, "https://github.com/silviucpp/erlkaf.git", "master"}},
}.
Using sys.config
you can have all clients (producers/consumers) started by default (by application controller)
Example of a configuration file (for sys.config
):
{erlkaf, [
{global_client_options, [
{bootstrap_servers, <<"broker1.com:9092,broker2.com:9092">>},
]},
{clients, [
{client_producer_id, [
{type, producer},
{topics, [
{<<"benchmark">>, [{request_required_acks, 1}]}
]},
{client_options, [
{queue_buffering_max_messages, 10000}
]}
]},
{client_consumer_id, [
{type, consumer},
{group_id, <<"erlkaf_consumer">>},
{topics, [
{<<"benchmark">>, [
{callback_module, module_topic1},
{callback_args, []},
{dispatch_mode, one_by_one}
]}
]},
{topic_options, [
{auto_offset_reset, smallest}
]},
{client_options, [
{offset_store_method, broker}
]}
]}
]}
]}
global_client_options
will apply to all clients defined. In case the global property it's defined as well in the client
options it's value will be overwritten.
For producers in case you don't need to customize the topic properties you can omit the topics
property as time they will
be created on the first produce operation with default settings.
The following example will create a producer client with id client_producer
which sends also delivery reports to the same module.
In order to receive the delivery reports you need to implement the erlkaf_producer_callbacks
protocol or to setup a function with
arity 2 into delivery_report_callback
config (for example: {delivery_report_callback, fun(DeliveryStatus, Message) -> .. end}
).
The function specified into delivery report callback is called async from another process (each producer has it's own process from where it's dispatching the delivery reports)
In case you want to define any properties to the topic where you are going to produce messages, you need to create the topic object attached to the client. To do this you can use the
erlkaf:create_topic
method. This needs to be done before any produce operation which will lead in create the topic with default settings.
-module(test_producer).
-define(TOPIC, <<"benchmark">>).
-export([
delivery_report/2,
create_producer/0,
produce/2
]).
-behaviour(erlkaf_producer_callbacks).
delivery_report(DeliveryStatus, Message) ->
io:format("received delivery report: ~p ~n", [{DeliveryStatus, Message}]),
ok.
create_producer() ->
erlkaf:start(),
ProducerConfig = [
{bootstrap_servers, <<"broker1:9092">>},
{delivery_report_only_error, false},
{delivery_report_callback, ?MODULE}
],
ok = erlkaf:create_producer(client_producer, ProducerConfig),
ok = erlkaf:create_topic(client_producer, ?TOPIC, [{request_required_acks, 1}]).
produce(Key, Value) ->
ok = erlkaf:produce(client_producer, ?TOPIC, Key, Value).
You can call those like:
ok = test_producer:create_producer().
test_producer:produce(<<"key1">>, <<"val1">>).
And you will get into the console the delivery reports:
received delivery report: {ok, {erlkaf_msg,<<"benchmark">>,4,6172,<<"key1">>,<<"val1">>}}
In case you are not interested in the delivery reports don't specify any callback, or in case you want to receive the
delivery reports only in case of errors you have to specify a callback and set delivery_report_only_error
on true
.
The produced messages are queued in memory based on (queue_buffering_max_messages
and queue_buffering_max_kbytes
),
until they are delivered and acknowledged by the kafka broker, once the memory queue it's full there are three options defined by (queue_buffering_overflow_strategy
) :
local_disk_queue
(default) - records are persisted on local disk and once there is enough space are sent to memory queueblock_calling_process
- calling process it's blocked until there is enough room into the memory queuedrop_records
- records are dropped in case the memory queue is full
The following example creates a consumer group that will consume messages from benchmark
topic. For each topic and partition
the application will spawn an erlang process that will pull the messages.
Each time the rebalance process takes place the process it's restarted so the init/4
method will be called again. In case the
handle_message/2
it's returning {ok, State}
then the message is considered processed and the offset is stored to be committed
based on auto_commit_interval_ms
setting. In case you want to mark an error but also to update the state you can use {error, Reason, NewState}
,
basically the event for the same message will be triggered again but with the new state.
-module(test_consumer).
-include("erlkaf.hrl").
-define(TOPICS, [
{<<"benchmark">>, [
{callback_module, ?MODULE},
{callback_args, []}, % default [] (you can skip it)
{dispatch_mode, one_by_one} % default one_by_one (you can skip it)
]}
]).
-export([
create_consumer/0,
init/4,
handle_message/2
]).
-behaviour(erlkaf_consumer_callbacks).
-record(state, {}).
create_consumer() ->
erlkaf:start(),
ClientId = client_consumer,
GroupId = <<"erlkaf_consumer">>,
ClientCfg = [{bootstrap_servers, <<"broker1:9092">>}],
TopicConf = [{auto_offset_reset, smallest}],
ok = erlkaf:create_consumer_group(ClientId, GroupId, ?TOPICS, ClientCfg, TopicConf).
init(Topic, Partition, Offset, Args) ->
io:format("init topic: ~p partition: ~p offset: ~p args: ~p ~n", [
Topic,
Partition,
Offset,
Args
]),
{ok, #state{}}.
handle_message(#erlkaf_msg{topic = Topic, partition = Partition, offset = Offset}, State) ->
io:format("handle_message topic: ~p partition: ~p offset: ~p state: ~p ~n", [
Topic,
Partition,
Offset,
State
]),
{ok, State}.
You can call those like:
ok = test_consumer:create_consumer().
We forward all rdkafka errors to the calling application. To translate the received error, please use get_readable_error/1
on the returned error to get a translated message
In order to get statistics you can register the stats_callback
callback which is called every statistics_interval_ms
(default 0 which means
statistics are disabled). The granularity is 1000 ms. Also you can poll the stats for each client using erlkaf:get_stats/1
.
The list of all available properties of client and topics are described here