Pulsar Flink

02 November 2022

Tags : streaming, pulsar, flink, java

Pulsar Flink

I have been messing around with yet another streaming demo (YASD). You really just cannot have too many. 🤩

I am a fan of server sent events, why ? because they are HTML5 native. No messing around with web sockets. I have a a small quarkus app that generates stock quotes:

that you can easily run locally or on OpenShift:

oc new-app quay.io/eformat/quote-generator:latest
oc create route edge quote-generator --service=quote-generator --port=8080

and then retrieve the events in the browser or by curl:

curl -H "Content-Type: application/json" --max-time 9999999 -N http://localhost:8080/quotes/stream

So, first challenge - How might we consume these SSE’s using Flink? I found a handy AWS Kinesis SSE demo which i snarfed the SSE/OKHttp code from. I wired this into flinks RichSourceFunction:

@Override
public void run(SourceContext<String> sourceContext) throws Exception {
OkHttpClient client = getUnsafeOkHttpClient();
Request.Builder requestBuilder = new Request.Builder();
// Create a request and connect using the standard headers for SSE endpoints
Request request = requestBuilder
.url(url)
.header("Accept-Encoding", "")
.header("Accept", "text/event-stream")
.header("Cache-Control", "no-cache")
.build();
logger.info("SSESource Request created to: " + url);
EventSourceListener listener = new EventSourceSender(sourceContext, logger, null);
final EventSource eventSource = EventSources.createFactory(client).newEventSource(request, listener);
logger.info("SSESource connected");
try {
long startTime = System.currentTimeMillis();
// while we are connected and running we need to hold this thread and report messages received if that option is enabled.
// SSE events are sent via a callback in another thread
Thread.sleep(10000000);
} catch (InterruptedException e) {
logger.error("Sleep timer interrupted");
}
eventSource.cancel();
logger.info("SSESource Stopping event source");
}
view raw SSESource.java hosted with ❤ by GitHub

So now i could consume this SSE source as a DataStream

DataStream<Quote> nflxPrices = createSourceFromApplicationProperties(env, "NFLX");
DataStream<Quote> rhtPrices = createSourceFromApplicationProperties(env, "RHT");
view raw StockBuyer.java hosted with ❤ by GitHub

In the example, i wire in the stock quotes for NFLX and RHT. Next step, process these streams. Since i am new to flink, i started with a simple print function, then read this stock price example from 2015! cool. So i implemented a simple BuyFunction class that makes stock buy recommendations:

public class BuyFunction
extends KeyedProcessFunction<String, Quote, Buy> {
Logger logger = LoggerFactory.getLogger(StockBuyer.class);
private transient ValueState<Boolean> shouldBuy;
@Override
public void open(Configuration parameters) {
ValueStateDescriptor<Boolean> flagDescriptor = new ValueStateDescriptor<>(
"flag",
Types.BOOLEAN);
shouldBuy = getRuntimeContext().getState(flagDescriptor);
}
@Override
public void processElement(Quote quote, Context context, Collector<Buy> collector) throws Exception {
// Get the current state for the current key
Boolean lastBuyRecommendation = shouldBuy.value();
if (quote.getBid() > quote.getAsk()) {
// Set the flag to true
shouldBuy.update(true);
// Buy Low, Sell High !
// If you want to buy a share, you have to pay the ask price.
// If you want to sell shares, you'll receive the bid price.
// only buy if two buy recommendations in a row
//if (lastBuyRecommendation != null && lastBuyRecommendation) {
Buy buy = new Buy();
buy.setQuote(quote);
collector.collect(buy);
//}
} else {
shouldBuy.update(false);
}
}
}

Lastly, it needs to be sent to a sink. Again, i started by using a simple print sink:

nflx
.addSink(new BuySink())
//.addSink(new PulsarBuySink("NFLX"))
.name("NFLX buy");
view raw StockBuyer.java hosted with ❤ by GitHub

Friends of mine have been telling me how much more awesome Pulsar is compared to Kafka so i also tried out sending to a local pulsar container that you can run using:

podman run -it -p 6650:6650 -p 8081:8080 --rm --name pulsar docker.io/apachepulsar/pulsar:2.10.2 bin/pulsar standalone

And forwrded to pulsar using a simple utility class using the pulsar java client:

public class PulsarBuySink implements SinkFunction<Buy> {

Then consume the messages to make sure they are there !

podman exec -i pulsar bin/pulsar-client consume -s my-subscription -n 0 persistent://public/default/orders

And i need to write this post as well .. getting it to run in OpenShift …



Source code is here - https://github.com/eformat/flink-stocks

Commentaires