02 November 2022
Tags : streaming, pulsar, flink, java
I have been messing around with yet another streaming demo (YASD). You really just cannot have too many. 🤩
I am a fan of server sent events, why ? because they are HTML5 native. No messing around with web sockets. I have a a small quarkus app that generates stock quotes:
that you can easily run locally or on OpenShift:
oc new-app quay.io/eformat/quote-generator:latest
oc create route edge quote-generator --service=quote-generator --port=8080
and then retrieve the events in the browser or by curl:
curl -H "Content-Type: application/json" --max-time 9999999 -N http://localhost:8080/quotes/stream
So, first challenge - How might we consume these SSE’s using Flink? I found a handy AWS Kinesis SSE demo which i snarfed the SSE/OKHttp code from. I wired this into flinks RichSourceFunction
:
@Override | |
public void run(SourceContext<String> sourceContext) throws Exception { | |
OkHttpClient client = getUnsafeOkHttpClient(); | |
Request.Builder requestBuilder = new Request.Builder(); | |
// Create a request and connect using the standard headers for SSE endpoints | |
Request request = requestBuilder | |
.url(url) | |
.header("Accept-Encoding", "") | |
.header("Accept", "text/event-stream") | |
.header("Cache-Control", "no-cache") | |
.build(); | |
logger.info("SSESource Request created to: " + url); | |
EventSourceListener listener = new EventSourceSender(sourceContext, logger, null); | |
final EventSource eventSource = EventSources.createFactory(client).newEventSource(request, listener); | |
logger.info("SSESource connected"); | |
try { | |
long startTime = System.currentTimeMillis(); | |
// while we are connected and running we need to hold this thread and report messages received if that option is enabled. | |
// SSE events are sent via a callback in another thread | |
Thread.sleep(10000000); | |
} catch (InterruptedException e) { | |
logger.error("Sleep timer interrupted"); | |
} | |
eventSource.cancel(); | |
logger.info("SSESource Stopping event source"); | |
} |
So now i could consume this SSE source as a DataStream
DataStream<Quote> nflxPrices = createSourceFromApplicationProperties(env, "NFLX"); | |
DataStream<Quote> rhtPrices = createSourceFromApplicationProperties(env, "RHT"); |
In the example, i wire in the stock quotes for NFLX
and RHT
. Next step, process these streams. Since i am new to flink, i started with a simple print function, then read this stock price example from 2015! cool. So i implemented a simple BuyFunction
class that makes stock buy recommendations:
public class BuyFunction | |
extends KeyedProcessFunction<String, Quote, Buy> { | |
Logger logger = LoggerFactory.getLogger(StockBuyer.class); | |
private transient ValueState<Boolean> shouldBuy; | |
@Override | |
public void open(Configuration parameters) { | |
ValueStateDescriptor<Boolean> flagDescriptor = new ValueStateDescriptor<>( | |
"flag", | |
Types.BOOLEAN); | |
shouldBuy = getRuntimeContext().getState(flagDescriptor); | |
} | |
@Override | |
public void processElement(Quote quote, Context context, Collector<Buy> collector) throws Exception { | |
// Get the current state for the current key | |
Boolean lastBuyRecommendation = shouldBuy.value(); | |
if (quote.getBid() > quote.getAsk()) { | |
// Set the flag to true | |
shouldBuy.update(true); | |
// Buy Low, Sell High ! | |
// If you want to buy a share, you have to pay the ask price. | |
// If you want to sell shares, you'll receive the bid price. | |
// only buy if two buy recommendations in a row | |
//if (lastBuyRecommendation != null && lastBuyRecommendation) { | |
Buy buy = new Buy(); | |
buy.setQuote(quote); | |
collector.collect(buy); | |
//} | |
} else { | |
shouldBuy.update(false); | |
} | |
} | |
} |
Lastly, it needs to be sent to a sink. Again, i started by using a simple print sink:
nflx | |
.addSink(new BuySink()) | |
//.addSink(new PulsarBuySink("NFLX")) | |
.name("NFLX buy"); |
Friends of mine have been telling me how much more awesome Pulsar
is compared to Kafka
so i also tried out sending to a local pulsar container that you can run using:
podman run -it -p 6650:6650 -p 8081:8080 --rm --name pulsar docker.io/apachepulsar/pulsar:2.10.2 bin/pulsar standalone
And forwrded to pulsar using a simple utility class using the pulsar java client:
public class PulsarBuySink implements SinkFunction<Buy> { |
Then consume the messages to make sure they are there !
podman exec -i pulsar bin/pulsar-client consume -s my-subscription -n 0 persistent://public/default/orders
And i need to write this post as well .. getting it to run in OpenShift …
Source code is here - https://github.com/eformat/flink-stocks