## Blog: Mr. Mathiesen

This post is part of Sergey Tihon’s Blog F# Advent Calendar in English 2021. Please check out the other posts as well.

I have in two other occasions participated in this (fun _ -> async { let! event = Async.AwaitEvent … }). Perhaps if you find this blog post insightful, you might want to have a read of these other two as well:

• 2016: “Semantic Versioning .NET libraries and NuGet packages”.
• 2017: “Bloom Filter Sort (bfsort)”.

### Background

First of all, lets try to explain the title of this post and why it’s being used. There are two main parts:

• Poor man’s: The definition in the Cambridge English Dictionary states that it is: «used to refer to something that is a worse or cheaper version of something else that is mentioned».

Note: It’s not always the case that just because something is cheaper, it’s worse. As an example, lets take the Black Velvet beer cocktails.

• Kafka (stream-processing bus): The definition at Wikipedia Apache Kafka states that it: «is a framework implementation of a software bus using stream-processing. It is an open-source software platform developed by the Apache Software Foundation written in Scala and Java. The project aims to provide a unified, high-throughput, low-latency platform for handling real-time data feeds … Kafka uses a binary TCP-based protocol that is optimized for efficiency and relies on a message set abstraction that naturally groups messages together to reduce the overhead of the network roundtrip. This leads to larger network packets, larger sequential disk operations, contiguous memory blocks […] which allows Kafka to turn a bursty stream of random message writes into linear writes».

The key elements are:

• Service bus using stream-processing
• High-throughput, low-latency platform for handling real-time data feeds
• Uses a binary TCP-based protocol that is optimized for efficiency
• Groups messages together to reduce the overhead of the network roundtrip
• Larger: network packets, sequential disk operations & contiguous memory blocks

which are the features that we will mimic in our Poor man’s implementation.

Note: For a short, concise and comprehensive overview, I would highly suggest you to watch the following video Apache Kafka Explained (Comprehensive Overview) on YouTube, as you will have a better understanding of the terminology I will use later on in this blog post.

### Poor man’s Kafka from scratch

Now that we have an understanding of what we are trying to do, lets implement it from scratch in F# (*).

(*) - There are only a few places where I had to rely on C#: LINQ due to F# Seq.skip that is not safe (throws an Exception if empty) and Console.WriteLine for thread-safety.

#### Prerequisites

Before we can write any code, we first need to install .NET 6 (core) on my operating system. I have been using for many years NixOS Linux, the best operating system out there without any doubt, and I’m transitioning right now to macOS, due to their new powerful and battery friendly Apple Silicon chips.

For both systems, I’m using nix, which is the package-manager that spawned an operating system. The power of nix, allows you to define shell scripts in a pure and lazy, but sadly untyped, functional programming language. Here is a sample on how you would write a shell script to retrieve a specific version of dotnet:

which can then be stored as shell.nix file and be placed in a folder of a given project. In order to retrieve the specific dotnet version, you just have to execute nix-shell from a terminal and you will now be able use dotnet with the chosen (SDK) version, ONLY from that terminal:

Note: One of the important parts here is the shellHook’s, which will help you disable a few of Microsoft intrusive privacy-misbehavior and enable a few components that usually makes dotnet applications crash in *nix and macOS.

#### Structure

For the sake of simplicity, I have created a few libraries, suffixed with .fs, which will contain the shared logic between the executables, which are suffixed with .fsx.

Note: In order to execute F# scripts seemingly across *nix and macOS, the following header will be added on top of the .fsx files as well as making them executable from the terminal with chmod +x ….fsx

In this section, we will only mention, with some degree of details, what the following files contain. For full view of the implementation, please have a look at the Appendix: Source Code at the end of this blog post.

• common: The library mainly contains the TCP Sockets module, the Kafka Protocol and a few helper modules:

• Error module: Wraps dotnet exceptions as F# record types.

• Date module: Provides ISO-8601 UTC timestamps.

• Input module: Exposes logic to wait for a given ConsoleKey to be pressed. Indispensable when working with Async.Choice and terminal CLI.

• Output module: Provides thread-safe writes to the terminal. Sadly, the built-in printf in F#, isn’t.

• UID module: Generates unique identifiers, which are used to ensure that events, aren’t stored in the same location on disk.

• TCP module: Server and client based on Berkeley-sockets. The module is totally agnostic to the Kafka Protocol, therefore it could be re-used for other projects that relies on low-level POSIX-socket communication. The TCP.Server.listen will spawn a new async process for whenever a new socket connection is being established.

• Kafka Protocol module: Contains the data-types and binary-converters from/to as well as the network-package specification. A few tricks are used to ensure that protocol elements are created as expected: Single-case union constructors are made private but then exposed with Active-patterns to allow type deconstruction from consuming clients.

• broker: A simple server, without a cluster component, which will interact with both publishing as well as subscribing clients:

• app function: It’s the main (and recursive) application that will be fed to the TCP.Server.listen function. The application will send a welcome message and will expect the client respond by specifying if it’s a publisher or a subscriber as well as the topic. Based on the client response, it will then dispatch it to either the producer or consumer function.

• producer function: While there is a connection with the publisher, the broker will receive (recursively) batches of messages for the given topic. The batches will be stored in their binary form on disk, where filenames will be prefixed with ISO-8601 UTC timestamps, which will allow to serve those batches in a orderly and timely manner.

• consumer function: When a subscriber connects to the broker, it will provide an index, which will limit the amount of batches the broker will initially serve, as it is the subscribers responsibility, to keep a local state of what data they have already received from the broker. Once these batches are sent to the subscriber, the consumer function will then call the created function.

• created function: As with the producer function, while there is a connection with the subscriber, the broker will (recursively) listen to the specified topic folder, so whenever a new batch arrives, it will be sent immediately to the subscriber to provide real-time data feeds.

• domain: Very basic event data-types specification, which are not known by the broker, and a random-helper module:

• Domain module: Basic event-type specification.

• Random module: Helper to generate batches of events for testing/demo purposes.

• clipub A simple command line interface (CLI) used by publishers to retrieve human-readable events, convert them to a commonly-agreed binary format and send them to the broker:

• json function: Deserializes human-readable events.

• bson function: Serializes events to a binary format.

• app function: It’s the main (and recursive) application that will be fed to the TCP.Client.interact function. The application will wait for the broker to send a welcome message and will then respond by specifying it’s a publisher (0x10uy) as well as the topic. Afterwards it will call the producer function.

• producer function: While there is a connection with the broker, the producer will (recursively) listen to a specified topic folder, so whenever a new batch of events arrive. It will then retrieve them, deserialize the human-readable format and serialize them with the agreed binary format. Once this is done, it will be sent to the broker for storage persistence.

• clisub Is another simple command line interface (CLI), that is used by subscribers to retrieve binary formatted batches from the broker, convert them to a human-readable events and store them locally:
• bson function: Deserializes binary formatted batches.

• json function: Serializes to human-readable events.

• app function: It’s the main (and recursive) application that will be fed to the TCP.Client.interact function. The application will wait for the broker to send a welcome message and will then reply by specifying it’s a publisher (0x20uy) as well as the topic, which will contain an index, which is retrieved from a local state to ensure that the broker doesn’t re-send already received batches. Afterwards it will call the consumer function.

• consumer function: While there is a connection with the broker, the consumer will receive (recursively) batches of messages for the given topic. The binary formatted batches will be deserialized and then serialized to a human-readable format and stored locally on disk, in an orderly and timely manner, by prefixing filenames with ISO-8601 UTC timestamps.

• create: A simple helper-tool used to create random batches of human-readable events.

• remove: A simple helper-tool to reset server and clients state.

### Demo

Here we describe the steps, that were used for the demo:

1. The broker was started:
2021-11-29T21:46:25.2901362Z | STARTED | BROKER | Poor Man's Kafka

2. Then the publisher was started:
2021-11-29T21:46:30.2332728Z | STARTED | CLIPUB | Poor Man's Kafka
…
2021-11-29T21:46:30.2763232Z | VERBOSE | RECV   | (15,16,[|0uy; …; 114uy|])

3. Then the subscriber was started:
2021-11-29T21:46:30.9806649Z | STARTED | CLISUB | Poor Man's Kafka
…
2021-11-29T21:46:31.0159547Z | VERBOSE | RECV   | (15,32,[|0uy; …; 114uy|])

4. Then the event creator (4 and 22) was executed:
2021-11-29T21:46:41.1228100Z | CREATED | EVENTS | Random events: 4
…
2021-11-29T21:46:45.0834605Z | CREATED | EVENTS | Random events: 22

5. Then the subscriber was stopped:
2021-11-29T21:46:51.1911832Z | STOPPED | CLISUB | Poor Man's Kafka
…
2021-11-29T21:46:57.4568349Z | FAILURE | BROKER | Consumer:

6. Then the event creator (17 and 16) was executed again:
2021-11-29T21:46:57.4226179Z | CREATED | EVENTS | Random events: 17
…
2021-11-29T21:47:01.4008672Z | CREATED | EVENTS | Random events: 16

7. Then the subscriber was started again, catching up:
2021-11-29T21:47:26.2142101Z | STARTED | CLISUB | Poor Man's Kafka
2021-11-29T21:47:26.2575274Z | VERBOSE | RECV   | Topic: foobar (bytes: 0000012287)
2021-11-29T21:47:26.3647421Z | VERBOSE | RECV   | Topic: foobar (bytes: 0000012804)
…
2021-11-29T21:47:26.2494125Z | VERBOSE | RECV   | (15,32,[|0uy; …; 114uy|])
2021-11-29T21:47:26.2535665Z | VERBOSE | SEND   | Topic: foobar (bytes: 0000012287)
2021-11-29T21:47:26.2546217Z | VERBOSE | SEND   | Topic: foobar (bytes: 0000012804)

8. Then the event creator (15 and 9) was executed again:
2021-11-29T21:47:38.5005655Z | CREATED | EVENTS | Random events: 15
…
2021-11-29T21:47:42.4589979Z | CREATED | EVENTS | Random events: 9


### Conclusion

It’s worth mentioning that this code has been produced over the weekend (at most 72 hours) when I noticed (the deadline) that I had committed to write this blog post for the first of December. It’s also worth mentioning, that the last many years, I haven’t been able to make FsAutoComplete work for emacs on neither of my *nix boxes and now on my macOS. So I have actually written all this code without any intellisense. Just so win people understand, it’s like starting notepad.exe and just typing code in it 😅.

Nevertheless, if we recall the key elements of Apache Kafka:

• Service bus using stream-processing
• High-throughput, low-latency platform for handling real-time data feeds
• Uses a binary TCP-based protocol that is optimized for efficiency
• Groups messages together to reduce the overhead of the network roundtrip
• Larger: network packets, sequential disk operations & contiguous memory blocks

It should be clear to see that the mentioned key elements, are all part of this implementation, even though it might not be production ready.

Finally, I hope you have enjoyed the reading of this blog post. I guess the last thing to say is: Felices fiestas !!!