Reactive log stream processing with RxJava - Part II

Datetime:2016-08-23 04:41:35         Topic:         Share        Original >>
Here to See The Original Article!!!

In the previous post we saw how we can add a push based solution(RabbitMQ) to our "ELK" stack and how we can connect from Spring to RabbitMQ and have the log events emitted as a reactive stream.

Json file as the source of log events

Since our main target is to play around as easily as possible with RxJava operators, we'll simulate receiving the events from RabbitMQ by reading them from a json file instead.

This helps that we don't need to start the full stack of RabbitMQ+Logstash+Log emitter app(through docker-compose) to test our operator chain setup, and also having the same events being emitted in a deterministic manner helps us track down where we went wrong.

We'll be using Spring Profiles to switch between how the events are been generated so apart from the AmqpSourceEmitterConfiguration

we'll have JsonFileEmitterSourceConfiguration which pushes the json entries onto the PublishSubject (to be consistent with how the AMQP MessageListenerAdapter does it).

public class JsonFileEmitterSourceConfiguration {

    public Receiver receiver() {
        return new Receiver();

    @Bean(name = "events")
    public Observable<JsonObject> emitEvents(Receiver receiver) {

        return receiver.getPublishSubject();

    private void startEmitting(Receiver receiver) {
        PublishSubject<JsonObject> publishSubject = receiver.getPublishSubject();

        Supplier<Integer> waitForMillis = () -> 200; //force a little fixed delay between the events emitter.
        Executors.newSingleThreadExecutor() //we start a single separate thread to simulate the same conditions like  
            .submit(() -> produceEventsFromJsonFile(publishSubject, waitForMillis));

    private void produceEventsFromJsonFile(Observer<JsonObject> subscriber, Supplier<Integer> waitTimeMillis) {
        JsonArray events = Json.readJsonArrayFromFile("events.json");
        events.forEach(ev -> {

            JsonObject jsonObject = (JsonObject) ev;
  "Emitting {}", Json.propertyStringValue("message").call(jsonObject));


so now we have a Spring Bean named "events" which is an Observable - the main stream of events -.

and we can switch between profiles at start time

public static void main(String[] args) throws Exception {  
        AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext();



Until this point even if events are being emitted(since we're using a hot observable), nobody is yet subscribed so the events are just lost.

Let's start by just displaying the failed login events and in the end show how many failed logins there were from each ip.

The events being emitted are just some simulated Login events that we load from events.json which look like:

    "appName": "elk-testdata",
    "level": "INFO",
    "logger_name": "ro.fortsoft.elk.testdata.generator.event.LoginEvent",
    "thread_name": "pool-3-thread-4",
    "message": "SUCCESS login for user\u003d\\u0027",
    "remoteIP": "",
    "@timestamp": "2016-06-30T11:33:40.872Z",
    "host": ""
    "appName": "elk-testdata",
    "level": "ERROR",
    "logger_name": "ro.fortsoft.elk.testdata.generator.event.LoginEvent",
    "thread_name": "pool-3-thread-6",
    "message": "FAILED login for user\u003d\\u0027",
    "remoteIP": "",
    "@timestamp": "2016-06-30T11:33:40.874Z",
    "host": "",
public class LoginObservables {

    protected Observable<JsonObject> events;

    Observable<JsonObject> failedLogins() {
        return loginEvents
            .filter(Json.checkPropertyFunc("message", val -> val.startsWith("FAILED login")));
            .doOnNext((jsonObject) ->"Debug {}", Json.propertyStringValue("messsage")));

    public Subscription displayLogEventsSubscription() {
            .subscribe((jsonObject) ->"Got {}", Json.propertyStringValue("messsage")));

Remember until we connect the "sink"(the subscriber) events are not traveling down the operators chain -the operator functions are not executed-.

We can prove this by adding 'onNext()' - which is just a simple way to interpose an action in the chain, great for debugging purposes.

We registered the subscriber by annotating @Bean the 'displayLogEventsSubscription' that method is invoked automatically by Spring.

Without the Subscription bean the 'Debug' message is not seen.

Too many failed logins from the same ips

Now let's dive right in and try to see the cases where there are too many failed logins from a certain ip.

First we need a time range to measure the failed logins so we get a result of X failed logins / 10 secs.

We can use the window operator for this.

Operator window(long timespan, TimeUnit unit) splits the events stream into multiple windows of fixed time. It returns a stream of stream(Observable<Observable<T>>) each stream emitting events in it's timerange.

Observable<Observable<JsonObject>> windowStream = failedLogins().window(30, TimeUnit.SECONDS);

Sidenote: You might wonder why not a single continuous stream as return value? Well, because that the window operator needs to signal onComplete after it's reached the specified time value, and the reactive streams specs say that you cannot emit new events after onComplete .

There is also an overloaded version of window(long timespan, long timeshift, TimeUnit unit) - it's creating a new window with the specific timespan each 'timeshift', case where there may be overlaping windows.

Flatmap operator

Now the tricky part is that we need to register for events on each stream. This is where the flatMap operator comes in.

The flatMap operator is like the swiss army knife, it can be used for all sorts of usecases.

1. The fork-join. Consider the common case where for an event we need to make remote calls to other services. Each of those remote calls might return list of objects(but in the reactive streams world you might want to return Observable instead of List ). So each event might start other streams and we register the downstream for notifications for each event.

T --> Observable<T>

This looks similar to a fork-join operation.

So the 3,2,1 events make remote calls that each return an Observable<Integer> while zs is the internal stream(the flatten operation) that downstreams operators subscribe on.

We'll see later a concrete example of this.

Sidenote: The thing to notice from the image is that it has no longer an order guarantee. The events from the merged streams appear as soon as they are emitted and might get .

PS: Should we want to maintain order we can use concatMap , but in our particular case, this is not needed.

2.The stream 'walker'There are already multiple streams who emit events, we want the downstream operators to react on events that appears on each stream. This is the usecase we need right now. As a rule of thumb when you have an Observable<Observable<T>> you probably need flatMap next.

Observable<Observable<JsonObject>> windowStream = failedLogins().window(30, TimeUnit.SECONDS);

Observable<JsonObject> windowEvents = windowStream.flatMap(jsonWindowEventObservable -> jsonWindowEventObservable); //notice we now have a single stream as return value

The statement

flatMap(jsonWindowEventObservable -> jsonWindowEventObservable)

looks like it's not doing anything, but we just say that the map function should return our json event unchanged, but we now have the events from all the windows flattened into a single stream.

Since we want to count the number of failed logins per ip, we need the groupBy operator. It takes as parameter a function which gives the grouping key, in our case a function which returns the 'remoteIP' field in the json event for failed logins.

The groupBy operator returns a stream of streams, each new ip starts a new stream, while already "seen" ips emit events on their stream.

Observable<GroupedObservable<String, JsonObject>> groupedEventsPerIp = windowEvents.groupBy(Json.propertyStringValueFunc("remoteIP")); -(#)--------(#)-----> ---------(#)--------> -----(#)----------->

This is another reason why we first chose to use window on the initial stream of log events.

Since the initial stream of potentially infinite, groupBy splits it into potentially infinite number of stream thus a recipe for OutOfMemory.

Now that we have to count the number of events per each ip-substream. We again use flatMap to be able to register a function for each substream perform some calculation and then "flatten" the result back to a single stream.

.flatMap(groupEventForIP -> groupEventForIP
                                .map(failedLoginsCount -> {
                                    final String remoteIp = grouped.getKey();
                                    return new Pair<>(remoteIp, failedLoginsCount);

The flatMap operator has received as a parameter a groupEventForIP which is a GroupedObservable<String, JsonObject> (the string key being the remoteIP value) - so it's still a stream which means we can go ahead and apply stream operators on this substream-.

Since we need to count how many failed logins there were, we just apply the count() operator which returns a number that we use to pass downstream a Pair<remoteIP, failedLoginsCount>.

All together this looks like:

Observable<Pair<String, Integer>> failedLoginsPerWindow(int windowSecs) {
        return failedLogins()
                .window(windowSecs, TimeUnit.SECONDS)
                .flatMap(jsonWindowEventObservable -> jsonWindowEventObservable
                            .flatMap(groupEventForIP -> groupEventForIP
                                .map(failedLoginsCount -> {
                                    final String remoteIp = groupEventForIP.getKey();
                                    return new Pair<>(remoteIp, failedLoginsCount);

    public Subscription failedLoginsPerIpSubscription() {
        return failedLoginsPerWindow(10) //10 secs window
                .filter(failedLoginPair -> failedLoginPair.getValue() > minFailedLoginAttempts)
                .subscribe((failedLoginPair) ->"Possible brute force login: {}", failedLoginPair));

Put your ads here, just $200 per month.