How Functional Programming Makes Working with Kafka Easier?
When your system starts to grow and split into multiple services, you will presumably need a way to send messages between those services for processing. In a clean and nice architecture, each service should have a bounded context that defines its space of responsibility and other requirements outside of that context should be handled by other components including messages delivery, so the service will only emit a message (fire-and-forget) and other services that are interested in processing those messages will catch them.
A message queue (publish-subscribe-based) will be a good fit here. And in my opinion, Kafka should not be the default go-to choice when you think of a pub-sub because of its complexity and other solutions are designed to simplify the development and infrastructure work with reasonable trade-offs such as GCP PubSub, AWS Kinesis, or NATS, however, if none of them met your requirements and you want to deploy Kafka, then there are a few things you need to take care of.
In this blog post, We will build a simple and powerful Scala client in a functional style that solves some of hidden Kafka client difficulties and potentially simplifies testing, error handling, and other aspects as you will see later in the companion project.
Kafka with Scala
What is Kafka
Kafka is - a publish-subscribe based durable messaging system exchanging data between processes, applications, and servers. 1
Kafka consists of two sides: A producer that produces messages to a topic and a consumer that subscribes to a topic and consumes messages from that topic. Multiple consumers can subscribe to the same topic and poll all messages from it as long as they belong to different consumer groups. Each consumer group has an offset that points at the position in the topic where it consumed until.
Kafka Clients that Works with Scala
This is not meant to be a full comparison, so it’s enough to list some available clients with a couple of words about each of them.
- Apache Kafka (Java) Client: This is the official Java client. It has a Java API and I couldn’t find a Scala wrapper for it to make functional or thread-safe, ..etc
- CakeSolutions client: It’s a well-written Akka-based client. I used also this client myself since I was working in Cake Solutions. My former colleague David Piggott has a nice blog post about it, but that was before it has been acquired by Disney and I’m not sure how much they’re still maintaining it.
- Alpakka project: This client is based on Akka stream. I used this one also in a previous project and fits well if you have already Akka streams application. It’s helpful to design complex streaming graphs involving many Kafka topics and it helps also in managing offsets and applying back-pressure.
- Lagom Kafka Client: This based on Alpakka client, but it’s designed to work with Lagom
Apache Kafka Client
Let’s first start with the vanilla Java-based official client and see how we will enhance it with Cats and Cats Effect to make it more compatible with Scala and functional programming.
The simple imperative code looks like this 2:
def consumeFromKafka(topic: String) = {
val props = new Properties()
props.put("bootstrap.servers", "localhost:9094")
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
props.put("auto.offset.reset", "latest")
props.put("group.id", "consumer-group")
val consumer: KafkaConsumer[String, String] = new KafkaConsumer[String, String](props)
consumer.subscribe(util.Arrays.asList(topic))
while (true) {
val record = consumer.poll(1000).asScala
for (data <- record.iterator)
println(data.value())
}
}
THIS IS TOO JAVA!
Cats with Kafka
Cats is a library that provides type classes and data structures for functional programming in Scala like Monad, Semigroup, Monoid, Applicative, ..etc and Cats Effect that mainly provides the IO monad that describes an operation that performs a side effect where the result of the effect can be returned synchronously or asynchronously. With Cats and Cats Effect, you can define all the logic of your program without running anything (including operations that have side effects) and separately run the program (or part of it) synchronously or asynchronously. This will help us to improve testability and maintainability and help understand potential sources of errors.
Kafka Context
Apache Kafka Client is not Thread-Safe
As you saw above, the consumer is wrapped in while(true)
block which runs in a single thread. You need to handle messages in multiple threads for better utilization, but if you try to call kafkaConsumer.poll
on a thread other than the one that it was defined at, you will get an exception. To solve this, we will keep calling kafkaConsumer.poll
in the same thread and we will use another thread pool for handling messages. Let’s see how this is easy with cats.
Cats ContextShift
Cats provides ContextShift
as the pure alternative of Scala’s ExecutionContext
. It always you to switch thread-pools while performing flatmap operations, so you can do something like this:
val ec: ExecutionContextExecutor = ExecutionContext.global
val cs: ContextShift[IO] = IO.contextShift(ec)
for {
_ <- cs.shift
a <- f1
b <- cs.evalOn(anotherExecutionContext)(f2)
c <- f3
} yield (a, b, c)
f1
will run on the global
execution context and before running f2
, cs
will switch to anotherExecutionContext
and run f2 on it and then switch back to global
execution context and run f3
.
Kafka Single Thread with Cats Effects
Now, we will define a thread pool with a fixed number of 1 thread to initialize kafkaConsumer
and call .poll
and .close
and we will use ContextShift
to switch between this singular thread pool and the other thread pool used for handling messages which can be anything.
KafkaContext.scala
class KafkaContext(cs: ContextShift[IO]) {
private val pool: ExecutorService = Executors.newFixedThreadPool(1, new ThreadFactoryBuilder().setNameFormat("kafka-thread").build())
protected val synchronousExecutionContext: ExecutionContextExecutor = ExecutionContext.fromExecutor(pool)
def execute[A](f: => A): IO[A] = cs.evalOn(synchronousExecutionContext)(IO(f))
// Alias for execute
def ~>[A](f: => A): IO[A] = execute(f)
def close(): IO[Unit] = IO(pool.shutdown())
}
Building the Consumer
Now, we can create the Kafka consumer in the Kafka thread that we just defined in KafkaContext
. You can do this simply by creating a new instance of KafkaConsumer
and wrap it in KafkaContext.execute
:
kafkaContext.execute(new KafkaConsumer(...))
Since we need to close the Kafka consumer after we don’t need it anymore, it’s more suitable to define it as a closable resource and Cats provide a resource abstraction for that called Resource
used for acquiring and releasing resources using Resource.make
function that takes 2 functions: acquire
and release
and returns a Resource
instance where you can call use
that will allow you to use the resource and make sure to release it after you’re done with it. We can do the same thing KafkaContext
.
def createKafkaConsumer = kafkaContext ~> new KafkaConsumer(...)
def closeKafkaConsumer(kafkaConsumer: KafkaConsumer[...]) = kafkaContext ~> kafkaConsumer.close()
val kafkaConsumerResource = Resource.make(createKafkaConsumer)(closeKafkaConsumer)
kafkaConsumerResource.use { kafkaConsumer =>
kafkaContext ~> kafkaConsumer.subscribe(Collections.singletonList(config.topic))
}
Here we defined kafkaConsumerResource
that creates and closes the consumer and use it in the same thread. Next, it will queue messages to another thread pool with probably more than one thread for processing.
Handling messages in another thread pool
This will not be so difficult at all since ContextShift.evalOn
already switches back to the original execution context. The client code that uses kafkaConsumerResource
can specify the execution context where the messages are handled. The following code uses simple for comprehension to poll messages from the Kafka topic, process them and then poll next messages.
def consume: IO[Unit] = for {
messages <- kafkaContext ~> kafkaConsumer.poll(Duration.ofMillis(1000)).iterator().asScala.toList
_ <- messageHandler.handleMessage(messages)
_ <- consume
} yield ()
Here, only kafkaConsumer.poll
will run in the Kafka thread and messageHandler.handleMessage(messages)
will run in another thread pool decided on the caller of consume
function. This is a functional alternative to while(true)
used in the vanilla example.
Note that we called consume
in a non-tail-recursive position, but it’s stack safe means it will not throw StackOverFlowError
. Remember that consume
function doesn’t actually run anything, it just describes the task, so it lets cats.IO
decide how to execute it and cats.IO.flatMap
runs in a trampoline context
Putting them all together
Consumer.scala
import java.time.Duration
import java.util.{Collections, Properties}
import cats.effect.{IO, Resource, Timer}
import org.apache.kafka.clients.consumer.{KafkaConsumer, ConsumerConfig => KafkaConsumerConfig}
import org.apache.kafka.common.serialization.StringDeserializer
import scala.collection.JavaConverters._
import scala.concurrent.ExecutionContext
case class ConsumerConfig(servers: String, topic: String, consumerGroup: String)
class Consumer(messageHandler: MessageHandler, config: ConsumerConfig, kafkaContext: KafkaContext) {
implicit val timer: Timer[IO] = IO.timer(ExecutionContext.global)
val props = new Properties()
props.put(KafkaConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, config.servers)
props.put(KafkaConsumerConfig.GROUP_ID_CONFIG, config.consumerGroup)
private lazy val kafkaConsumer: KafkaConsumer[String, Array[Byte]] = {
new KafkaConsumer[String, Array[Byte]](props, new StringDeserializer, new StringDeserializer)
}
def close: IO[Unit] = kafkaContext ~> kafkaConsumer.close()
def subscribe(): IO[Unit] = kafkaContext ~> kafkaConsumer.subscribe(Collections.singletonList(config.topic))
def consume: IO[Unit] = for {
messages <- kafkaContext ~> kafkaConsumer.poll(Duration.ofMillis(1000)).iterator().asScala.toList
_ <- messageHandler.handleMessage(messages)
_ <- consume
} yield ()
}
object Consumer {
def resource(messageHandler: MessageHandler, config: ConsumerConfig, context: KafkaContext): Resource[IO, Consumer] =
Resource.make(IO(new Consumer(messageHandler, config, context)))(_.close)
}
Building the Consumer Application
ConsumerApplication.scala
import cats.effect.{ConcurrentEffect, ExitCode, IO, IOApp}
import scala.concurrent.{ExecutionContext, ExecutionContextExecutor}
object ConsumerApplication extends IOApp {
implicit private val ec: ExecutionContextExecutor = ExecutionContext.global
private val cs = IO.contextShift(ec)
implicit private val concurrentEffect: ConcurrentEffect[IO] = IO.ioConcurrentEffect(cs)
val config = ConsumerConfig("localhost:9092", "test-topic", "consumer-group-1")
override def run(args: List[String]): IO[ExitCode] = {
val applicationResource = for {
kafkaContext <- KafkaContext.resource(cs)
consumer <- Consumer.resource(new MessageHandler, config, kafkaContext)
} yield consumer
applicationResource.use(consumer => consumer.subscribe *> consumer.consume).as(ExitCode.Success)
}
}
Try it Yourself
dThis is good time to stop and try out the code we’ve just explained. You will build an SBT project and add dependencies of Cats, Cats Effects and Apache Kafka.
libraryDependencies ++= Seq(
"org.typelevel" %% "cats-core" % "2.1.1"
"org.typelevel" %% "cats-effect" % "2.1.3"
"org.apache.kafka" % "kafka-clients" % "2.5.0"
)
Then, you need to add 3 files: KafkaContext.scala
, Consumer.scala
and ConsumerApplication.scala
as explained above. When you finish this, you can then run sbt compile
to check that everything is correct then you can download and run Kafka from here. After downloading is done, you can extract and run Zookeeper and Kafka, create a topic and then start the Scala consumer, then you can start sending messages from Kafka console. You will need at least 4 terminals for this:
tar -xvf kafka_2.13-2.5.0.tgz
cd kafka_2.13-2.5.0
# First terminal: Start zookeeper
bin/zookeeper-server-start.sh config/zookeeper.properties
# Second terminal: Start Kafka
bin/kafka-server-start.sh config/server.properties
# Third terminal: Create a topic and then start Kafka producer console
bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic test-topic
bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic test-topic
# Fourth terminal: Start Scala consumer
cd <your Scala code>
sbt run
# [Optional] 5th terminal: Start Kafka consumer console which can be useful for debugging
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test-topic --from-beginning
Now, go to the third and start typing some words and press enter. Each line will be a string message and you should see them in the Scala consumer app that runs in the fourth terminal.
If you want to see the full ready-to-run code, please check out this branch.
Summary
In this section, we developed a functional Kafka Scala consumer with the help of Cats and Cats Effects. We saw how we could run Kafka client in its single thread because it’s not thread-safe and we saw how it’s easy to switch thread pools so we can process messages in a suitable thread pool and we did this using ContextShift provided by cats library. Finally, we tested everything together including running Kafka and Zookeeper and sent messages from Kafka producer console and received them in our Scala consumer.
In master
branch of the same repo, You can find more complete code that includes testing, error handling, logging, monitoring and deployment to Kubernetes. If you are interested in any of those topics, please let me know in comments and I will write about them in future posts.