Java 8 One Stream To Multiple Map

The name of the pictureThe name of the pictureThe name of the pictureClash Royale CLAN TAG#URR8PPP



Java 8 One Stream To Multiple Map



Lets say I have huge webserver log file that does not fit in memory. I need to stream this file to a mapreduce method and save to database. I do this using Java 8 stream api. For example, I get a list after the mapreduce process such as, consumption by client, consumption by ip, consumption by content. But, my needs are not that like that given in my example. Since I cannot share code, I just want to give basic example.



By Java 8 Stream Api, I want to read file exactly once, get 3 lists at the same time, while I am streaming file, parallel or sequential. But parallel would be good. Is there any way to do that?





only with a custom collector. Java-12 has a proposal to implement something like a BiCollector (name is not exact yet), but definitely not a TriCollector...
– Eugene
Aug 13 at 8:37



BiCollector


TriCollector





You can check out this question. In my answer, I use a custom spliterator to wrap the stream. It works in parallel (you can simplify it if you do not need the consumers to run in sync), but I don't know if it can easily be applied to reductions.
– Malte Hartwig
Aug 13 at 8:38






what you should be aware of that lines from a File are by definition hard to parallelize, so IIRC there will be an internal buffer of 1024 lines initially, and then increased to 2048 and than 3072 and so on... so if your file is smaller than 1024 lines, parallel processing is much worse than a sequential one
– Eugene
Aug 13 at 8:41






actually, I am getting source from NoSQL database. I just wanted to keep example simple. Thanks for your reply @Eugene
– Yılmaz
Aug 13 at 8:43





There's a proposal for a future JDK for this sort of thing - see the extensive discussion at marxsoftware.blogspot.com/2018/08/…. I think you'll find some code there doing something similar to what you want (IIRC it will stream into 2 collectors, but I'm sure you could expand further)
– Brian Agnew
Aug 13 at 8:52




2 Answers
2



I have adapted the answer to this question to your case. The custom Spliterator will "split" the stream into multiple streams that collect by different properties:


@SafeVarargs
public static <T> long streamForked(Stream<T> source, Consumer<Stream<T>>... consumers)

return StreamSupport.stream(new ForkingSpliterator<>(source, consumers), false).count();


public static class ForkingSpliterator<T>
extends AbstractSpliterator<T>

private Spliterator<T> sourceSpliterator;

private List<BlockingQueue<T>> queues = new ArrayList<>();

private boolean sourceDone;

@SafeVarargs
private ForkingSpliterator(Stream<T> source, Consumer<Stream<T>>... consumers)

super(Long.MAX_VALUE, 0);

sourceSpliterator = source.spliterator();

for (Consumer<Stream<T>> fork : consumers)

LinkedBlockingQueue<T> queue = new LinkedBlockingQueue<>();
queues.add(queue);
new Thread(() -> fork.accept(StreamSupport.stream(new ForkedConsumer(queue), false))).start();



@Override
public boolean tryAdvance(Consumer<? super T> action)

sourceDone = !sourceSpliterator.tryAdvance(t -> queues.forEach(queue -> queue.offer(t)));
return !sourceDone;


private class ForkedConsumer
extends AbstractSpliterator<T>

private BlockingQueue<T> queue;

private ForkedConsumer(BlockingQueue<T> queue)

super(Long.MAX_VALUE, 0);
this.queue = queue;


@Override
public boolean tryAdvance(Consumer<? super T> action)

while (queue.peek() == null)

if (sourceDone)

// element is null, and there won't be no more, so "terminate" this sub stream
return false;



// push to consumer pipeline
action.accept(queue.poll());

return true;





You can use it as follows:


streamForked(Stream.of(new Row("content1", "client1", "location1", 1),
new Row("content2", "client1", "location1", 2),
new Row("content1", "client1", "location2", 3),
new Row("content2", "client2", "location2", 4),
new Row("content1", "client2", "location2", 5)),
rows -> System.out.println(rows.collect(Collectors.groupingBy(Row::getClient,
Collectors.groupingBy(Row::getContent,
Collectors.summingInt(Row::getConsumption))))),
rows -> System.out.println(rows.collect(Collectors.groupingBy(Row::getClient,
Collectors.groupingBy(Row::getLocation,
Collectors.summingInt(Row::getConsumption))))),
rows -> System.out.println(rows.collect(Collectors.groupingBy(Row::getContent,
Collectors.groupingBy(Row::getLocation,
Collectors.summingInt(Row::getConsumption))))));

// Output
// client2=location2=9, client1=location1=3, location2=3
// client2=content2=4, content1=5, client1=content2=2, content1=4
// content2=location1=2, location2=4, content1=location1=1, location2=8



Note that you can do pretty much anything you want with your the copies of the stream. As per your example, I used a stacked groupingBy collector to group the rows by two properties and then summed up the int property. So the result will be a Map<String, Map<String, Integer>>. But you could also use it for other scenarios:


groupingBy


Map<String, Map<String, Integer>>


rows -> System.out.println(rows.count())
rows -> rows.forEach(row -> System.out.println(row))
rows -> System.out.println(rows.anyMatch(row -> row.getConsumption() > 3))





I tried your method. I feel that we are on the same page. Your code is beyond my java skills. I created a github project. Could you check my project, please? github.com/ftylmz1/java-stream-multiple-grouping
– Yılmaz
Aug 13 at 12:15






@Yılmaz I have updated my code now that I understand what you want to do. You should be able to copy and paste that (might have to remove some 'static' modifiers from the classes, I wrote it all in one file).
– Malte Hartwig
Aug 13 at 13:26





Thank you much for your answer @Malte Hartwig. I implemented and tested your code. It works for me. The usage method like I wanted. Thank you again :)
– Yılmaz
Aug 13 at 14:06





@MalteHartwig +1, I did not even had the time to open that GitHub project, but you... nice!
– Eugene
Aug 13 at 18:45





@MalteHartwig how I can make streamForked method syncronize with three mapreduce method. I mean, After completing those three jobs, streamForked method goes next line.
– Yılmaz
Aug 14 at 11:58



Generally collecting to anything other than standard API's gives you is pretty easy via a custom Collector. In your case collecting to 3 lists at a time (just a small example that compiles, since you can't share your code either):


Collector


private static <T> Collector<T, ?, List<List<T>>> to3Lists()
class Acc

List<T> left = new ArrayList<>();

List<T> middle = new ArrayList<>();

List<T> right = new ArrayList<>();

List<List<T>> list = Arrays.asList(left, middle, right);

void add(T elem)
// obviously do whatever you want here
left.add(elem);
middle.add(elem);
right.add(elem);


Acc merge(Acc other)

left.addAll(other.left);
middle.addAll(other.middle);
right.addAll(other.right);

return this;


public List<List<T>> finisher()
return list;



return Collector.of(Acc::new, Acc::add, Acc::merge, Acc::finisher);



And using it via:


Stream.of(1, 2, 3)
.collect(to3Lists());



Obviously this custom collector does not do anything useful, but just an example of how you could work with it.





Thank your for your response. I tried it. But it is hard to me implement this to this. Could you help me in this? What kind of custom class, I can write to pass for this method. Stream.of(1, 2, 3).collect(Collectors.toMap(g-> return null; , v -> return null; , (t, u) -> return t; ));
– Yılmaz
Aug 13 at 9:29





@Yılmaz I would help you, if you could help me... the piece of code you wrote makes very very little sense
– Eugene
Aug 13 at 9:31





I created github project. This is the link github.com/ftylmz1/java-stream-multiple-grouping
– Yılmaz
Aug 13 at 12:13






By clicking "Post Your Answer", you acknowledge that you have read our updated terms of service, privacy policy and cookie policy, and that your continued use of the website is subject to these policies.

Popular posts from this blog

Firebase Auth - with Email and Password - Check user already registered

Dynamically update html content plain JS

How to determine optimal route across keyboard