how to add new event to flink CEP data stream?
Clash Royale CLAN TAG#URR8PPP
how to add new event to flink CEP data stream?
I am using flink 1.5.2 to solve a CEP problem.
my data is from a List, some other process will add new Event object to that list while system is running. It is not socket or network message. I've been reading the official site example. Here are the steps I imagine I should be doing.
But my input stream should be unbounded. I didn't find any add() method in DataStream<> object. How do I accomplish this? and also, do I need to tell DataStream<> when to clean up obsolete events?
1 Answer
1
Collections are only suitable as an input source for Flink when working with a bounded input set that's fixed up front, as when writing a test or just experimenting. If you want an unbounded stream you will need to choose a different source, such a socket or a message queuing system like Kafka.
Sockets are easy to work with for experimentation. On Linux and MacOS systems you can use
nc -lk 9999
to create a socket that Flink can bind to on port 9999, and whatever you provide as input to nc
(netcat) will be streamed into your Flink job one line at a time. Netcat is also available for Windows, but isn't pre-installed.
nc
However, you shouldn't plan to use sockets in production, as they can't be rewound (which is crucial for achieving accurate results with Flink during failure recovery).
I have implemented a SourceFunction<> to test the flow. I add the source by env.addSource(). The run() method in my SourceFunction<> has a while loop which calls context.collect() to emit element. Finally I call the env.execute(). But the run() method is never called by anyone. I must have misunderstood how Flink CEP works. Is there a document which explain how to use CEP library to monitor the data stream?
– Maxi Wu
Aug 13 at 5:56
I found the problem. I am using a loop to generate testing event and feed to my custom source. But I have a Thread.sleep(t) in the generator. When the Thread.sleep(t) is called, the whole program stuck. May be Thread.sleep() is conflicting with env.execute(); Is there anything I should be handling in a multi-threaded Source?
– Maxi Wu
Aug 13 at 7:00
Oh, I was wrong. The new thread is running correctly, the real problem is the queue object. SourceFunction.run() gets a queue with no elements . But the queue object which I am adding elements is not empty. I didn't know why there are two queue while I have only one source object.
– Maxi Wu
Aug 13 at 7:07
I am guessing, the Source object needs to be serialize, and the de-serialize before run(). Thats why they are different instances.
– Maxi Wu
Aug 14 at 2:59
By clicking "Post Your Answer", you acknowledge that you have read our updated terms of service, privacy policy and cookie policy, and that your continued use of the website is subject to these policies.
I see, I do have a mqtt broker, but it has SSL. I will try to use flink to connect to that broker. Thanks
– Maxi Wu
Aug 13 at 1:37