Message Broker in Lagom using Kafka

Datetime:2017-04-17 05:31:04         Topic: Apache Kafka  Java          Share        Original >>
Here to See The Original Article!!!

What is Lagom?

Lagom framework helps in simplifying the development of microservices by providing an integrated development environment. This benefits one by allowing them to focus on solving business problems instead of wiring services together.

Lagom exposes two APIs, Java and Scala, and provides a framework and development environment as a set of libraries and build tool plugins. The supported build tools with Lagom are Maven and sbt. You can use Maven with Java or sbt with Java or Scala.

Message Broker Support in Lagom

If there is a synchronous communication between microservices, it implies that both the sender and the receiver have to be running at the same time. Now this may lead to consistency problems if messages get missed, and can result in a system that is brittle, where a failure in one component can lead to failure of the complete system.

As a solution to this, one can use an infrastructure component to enable services to communicate asynchronously. This component is referred to as a message broker.

To support this, Lagom provides a Message Broker API which makes it very simple for the services to share data asynchronously.

Currently, Lagom supports implementation of the Message Broker API that uses Kafka.

Publishing to a Kafka Topic

To publish data to a Kafka topic, it is needed to be declared the service descriptor. Let us look into its implementation with an example,

public interface HelloService extends Service {


    String GREETINGS_TOPIC = "greeting";

    ServiceCall<NotUsed, String> hello(String id);

    ServiceCall<GreetingMessage, Done> useGreeting(String id);

    @Override
    default Descriptor descriptor() {
        return named("helloservice").withCalls(
                pathCall("/api/hello/:id", this::hello),
                pathCall("/api/hello/:id", this::useGreeting)
        ).publishing(
                topic(GREETINGS_TOPIC, this::greetingsTopic)
        ).withAutoAcl(true);
    }

    Topic greetingsTopic();
}

Here, Descriptor.publishing method accepts a sequence of topic calls. Each topic call in the Service.topic static method.

The Service.topic takes a topic name and a reference to a method that returns a Topic instance. Here, the topic Name is “greeting” and method reference is greetingsTopic().

Implementing a Topic

The primary source of messages that Lagom is designed to produce is persistent entity events. Lagom’sTopic Producer API provides the following methods for publishing a persistent entities event stream,

  • singleStreamWithOffset
    • used for non sharded read side event streams
    • Lagom ensures that topic Producer runs on only one node of your cluster
  • taggedStreamWithOffset
    • used for sharded read side event streams.
    • Lagom distributes the tags evenly across the cluster to distribute the publishing load.

Example of publishing to a single, non sharded event stream:

public Topic greetingsTopic() {
    return TopicProducer.singleStreamWithOffset(offset ->
persistentEntityRegistry.eventStream(HelloEventTag.INSTANCE, offset)
            .map(this::convertEvent));
}

As soon as the service gets started, the read-side event stream you passed to the topic producer also gets started . That means all events persisted by your services will eventually be published to the connected topic.

Subscribing to a topic

A service needs to a call Topic.subscribe() on the topic of interest to be able to subscribe to it. Let us look into its implementation by subscribing to greetingsTopic that we used in earlier examples.

helloService.greetingsTopic().subscribe()
    .atLeastOnce(Flow.fromFunction(this::displayMessage));

Here you will get a Subscriber instance to greetings Topic using at-least-once semantics. At-least-once means that each message published to greetings topic is received at least once, but possibly more.

For a hands on working example of this blog, please refer to this activator template .

References:








New