KAFKA installation on windows and producer and consumer in .net
KAFKA installation on windows and producer and consumer in .net

Kafka® is used for building real-time data pipelines and streaming apps. It is horizontally scalable, fault-tolerant, wicked fast, and runs in production in thousands of companies.

Kafka has four core APIs:
To know more about Kafka please go over this link:https://kafka.apache.org/intro.html
To install Kafka on a windows machine you have to download the following software.
Prerequisites:
Apache Zookeeper
https://zookeeper.apache.org/releases.html#download
Run Zookeeper:zookeeper-server-start.bat
../../config/zookeeper.properties
Run Kafka Server:kafka-server-start.bat ../../config/server.properties
Create Topic:kafka-topics.bat --create
--zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
Topics List:kafka-topics.bat --list --zookeeper
localhost:2181
create Producer
kafka-console-producer.bat
broker-list localhost:2181 --topic
test
Create Consumer
kafka-console-consumer.bat
--bootstrap-server localhost:2181 --topic
test
--from-beginning
var conf = new ProducerConfig { BootstrapServers = "localhost:9092" };
bool KeepTextin = true;
Action<DeliveryReport<Null, string>> handler = r =>
Console.WriteLine(!r.Error.IsError
? $"Delivered message to {r.TopicPartitionOffset}"
: $"Delivery Error: {r.Error.Reason}");
using (var p = new ProducerBuilder<Null, string>(conf).Build())
{
while (KeepTextin)
{
string text = Console.ReadLine();
p.Produce("testin", new Message<Null, string> { Value = text.ToString() }, handler);
}
p.Flush(TimeSpan.FromSeconds(10));
}
following code allows you to read a message from Kafka stream.
var config = new ConsumerConfig
{
BootstrapServers = "localhost:9092",
GroupId = "MyGroup",
AutoOffsetReset = AutoOffsetReset.Earliest
};
using (var c = new ConsumerBuilder<Ignore, string>(config).Build())
{
c.Subscribe("testin");
CancellationTokenSource cts = new CancellationTokenSource();
Console.CancelKeyPress += (_, e) => {
e.Cancel = true;
cts.Cancel();
};
try
{
while (true)
{
try
{
var cr = c.Consume(cts.Token);
Console.WriteLine($"Consumed message '{cr.Value}' at: '{cr.TopicPartitionOffset}'.");
}
catch (ConsumeException e)
{
Console.WriteLine($"Error occured: {e.Error.Reason}");
}
}
}
catch (OperationCanceledException)
{
c.Close();
}
please comment in case of any questions.
Thanks
Comments
Post a Comment