KAFKA installation on windows and producer and consumer in .net

KAFKA installation on windows and producer and consumer in .net

Running Apache Kafka on Windows 10 - Towards Data Science


Kafka® is used for building real-time data pipelines and streaming apps. It is horizontally scalable, fault-tolerant, wicked fast, and runs in production in thousands of companies.






Kafka has four core APIs:
  • The Producer API allows an application to publish a stream of records to one or more Kafka topics.
  • The Consumer API allows an application to subscribe to one or more topics and process the stream of records produced to them.
  • The Streams API allows an application to act as a stream processor, consuming an input stream from one or more topics and producing an output stream to one or more output topics, effectively transforming the input streams to output streams.
  • The Connector API allows building and running reusable producers or consumers that connect Kafka topics to existing applications or data systems. For example, a connector to a relational database might capture every change to a table.

To know more about Kafka please  go over this link:https://kafka.apache.org/intro.html

To install Kafka on a windows machine you have to download the following software.

Prerequisites:





Run Zookeeper:zookeeper-server-start.bat ../../config/zookeeper.properties


Run Kafka Server:kafka-server-start.bat ../../config/server.properties


Create Topic:kafka-topics.bat --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test


Topics List:kafka-topics.bat --list --zookeeper localhost:2181

create Producer

kafka-console-producer.bat broker-list localhost:2181 --topic test

Create Consumer

kafka-console-consumer.bat --bootstrap-server localhost:2181 --topic test --from-beginning


following simple code will allow you to insert any message into Kafka topic, if the given topic is not already there then it will create a topic itself.


var conf = new ProducerConfig { BootstrapServers = "localhost:9092" };
bool KeepTextin = true;
Action<DeliveryReport<Nullstring>> handler = r =>
    Console.WriteLine(!r.Error.IsError
        ? $"Delivered message to {r.TopicPartitionOffset}"
        : $"Delivery Error: {r.Error.Reason}");

using (var p = new ProducerBuilder<Nullstring>(conf).Build())
{
    while (KeepTextin)
    {
        string text = Console.ReadLine();
        p.Produce("testin"new Message<Nullstring> { Value = text.ToString() }, handler);
    }
    
    p.Flush(TimeSpan.FromSeconds(10));
}



following  code allows you to read a message from  Kafka stream.

var config = new ConsumerConfig
{
    BootstrapServers = "localhost:9092",
    GroupId = "MyGroup",
    AutoOffsetReset = AutoOffsetReset.Earliest
};

using (var c = new ConsumerBuilder<Ignorestring>(config).Build())
{
    c.Subscribe("testin");

    CancellationTokenSource cts = new CancellationTokenSource();
    Console.CancelKeyPress += (_e) => {
    e.Cancel = true;
    cts.Cancel();
};

try
{
    while (true)
    {
        try
        {
            var cr = c.Consume(cts.Token);
            Console.WriteLine($"Consumed message '{cr.Value}' at: '{cr.TopicPartitionOffset}'.");
        }
        catch (ConsumeException e)
        {
            Console.WriteLine($"Error occured: {e.Error.Reason}");
        }
    }
}
catch (OperationCanceledException)
{

    c.Close();
}

please comment in case of any questions.

Thanks 

Comments

Popular posts from this blog

Setup source code of Visual studio on Bitbucket

.Net Core WebClient and proxy setup

Tables with ledger in sql server 2022