The Wayback Machine - https://web.archive.org/web/20201122205106/https://github.com/eaba/SharpPulsar
Skip to content
master
Go to file
Code

Latest commit

 

Git stats

Files

Permalink
Failed to load latest commit information.
Type
Name
Latest commit message
Commit time
 
 
 
 
 
 
 
 
 
 

README.md

SharpPulsar

SharpPulsar is Apache Pulsar Client built using Akka.net.

What Is Akka.Net?

Akka.NET is a professional-grade port of the popular Java/Scala framework Akka distributed actor framework to .NET.

What Is Apache Pulsar?

Pulsar is a distributed pub-sub messaging platform with a very flexible messaging model and an intuitive client API. Supported pulsar cluster versions: 2.5+

Supported features

  • Basic Producer/Consumer API
  • Partitioned Topics Producer
  • Producer Broadcast Group - broadcast single message to multiple topics including partitioned topics
  • Bulk Message Producer
  • Batching
  • Multi-topics Consumer
  • Topics Regex Consumer
  • Reader API
  • Schema
  • Large Message Chunking
  • End-To-End Message Encryption
  • Proxy support(SNI supporteed)
  • Consumer seek
  • Compression
  • TLS
  • Authentication (token, tls, OAuth2)
  • Key_shared
  • key based batcher
  • Negative Acknowledge
  • Delayed Delivery Messages
  • User defined properties producer/consumer(or message tagging)
  • Interceptors
  • Routing (RoundRobin, ConsistentHashing, Broadcast, Random)
  • Pulsar SQL
  • Pulsar Admin/Function API
  • EventSource(Reader API/Presto SQL)
  • More...

Getting Started

Install the NuGet package SharpPulsar and follow the Sample.

Usage

1 - Startup SharpPulsar:

 var clientConfig = new PulsarClientConfigBuilder()
                .ServiceUrl(endPoint)
                .ConnectionsPerBroker(1)
                //If Broker is behind proxy, set that in the Client Configuration
                .UseProxy(true)
                .OperationTimeout(opto)
                .AllowTlsInsecureConnection(false)
                .ProxyServiceUrl(url, ProxyProtocol.SNI)
                .Authentication( new AuthenticationDisabled())                             //.Authentication(AuthenticationFactory.Token("eyJhbGciOiJSUzI1NiJ9.eyJzdWIiOiJzaGFycHB1bHNhci1jbGllbnQtNWU3NzY5OWM2M2Y5MCJ9.lbwoSdOdBoUn3yPz16j3V7zvkUx-Xbiq0_vlSvklj45Bo7zgpLOXgLDYvY34h4MX8yHB4ynBAZEKG1ySIv76DPjn6MIH2FTP_bpI4lSvJxF5KsuPlFHsj8HWTmk57TeUgZ1IOgQn0muGLK1LhrRzKOkdOU6VBV_Hu0Sas0z9jTZL7Xnj1pTmGAn1hueC-6NgkxaZ-7dKqF4BQrr7zNt63_rPZi0ev47vcTV3ga68NUYLH5PfS8XIqJ_OV7ylouw1qDrE9SVN8a5KRrz8V3AokjThcsJvsMQ8C1MhbEm88QICdNKF5nu7kPYR6SsOfJJ1HYY-QBX3wf6YO3VAF_fPpQ"))
                .ClientConfigurationData;

var pulsarSystem = PulsarSystem.GetInstance(clientConfig);

2 - Create Producer and Send Messages:

           var jsonSchem = AvroSchema.Of(typeof(Students));
            var producerListener = new DefaultProducerListener((o) =>
            {
                Console.WriteLine(o.ToString());
            }, s =>
            {
                Receipts.Add(s);
            });
            var producerConfig = new ProducerConfigBuilder()
                .ProducerName($"Test-{topic}-{Guid.NewGuid()}")
                .Topic(topic)
                .Schema(jsonSchem)
                .EnableChunking(true)
                .EventListener(producerListener)
                .ProducerConfigurationData;

            var t = pulsarSystem.PulsarProducer(new CreateProducer(jsonSchem, producerConfig));            
            Console.WriteLine($"Acquired producer for topic: {t.Topic}");
            SendMessages(system, topic, t.Producer, key, value, tag);

3 - You can Consume messages in one of two ways(Listener or Queue). ConsumptionType.Queue let you pull messages forever or with limit by supplying a takeCount of -1 or {number of messages} to pull, respectively:

           var consumerListener = new DefaultConsumerEventListener(Console.WriteLine);
            var messageListener = new DefaultMessageListener(StudentHandler, null);
            var jsonSchem = AvroSchema.Of(typeof(Students));
            var topicLast = topic.Split("/").Last();
            var consumerConfig = new ConsumerConfigBuilder()
                .ConsumerName(topicLast)
                .ForceTopicCreation(true)
                .SubscriptionName($"{topicLast}-Subscription")
                .Topic(topic)
                //.AckTimeout(10000)
                .AcknowledgmentGroupTime(0)
                .ConsumerEventListener(consumerListener)
                .SubscriptionType(CommandSubscribe.SubType.Exclusive)
                .Schema(jsonSchem)
                .SetConsumptionType(ConsumptionType.Queue)
                .MessageListener(messageListener)
                .StartMessageId(MessageIdFields.Earliest)
                .SubscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
                .ConsumerConfigurationData;
            system.PulsarConsumer(new CreateConsumer(jsonSchem, consumerConfig));

4 - Explore more

5 - It is a big sin not to report any difficulties or issues being experienced using SharpPulsar

License

This project is licensed under the Apache License Version 2.0 - see the LICENSE file for details.

About

SharpPulsar is Apache Pulsar client built with Akka.net, inspired from the Pulsar java client. Enjoy!

Topics

Resources

License

Packages

No packages published

Languages

You can’t perform that action at this time.