RXJava is an extremely useful streaming framework (here is an example application using it for parallel processing of restful calls to both uber and lyft (RT_UBER_NYC_TAXI)). However, In this post, I will cover how you can reactively stream and process a CSV file.
Firstly, you can create a Flowable of CSVRecord (commons-csv) by converting iterator to Flowable using the call Flowable.fromIterable(). Next, we want this to be safe resource usage i.e. we don't want to leave open file handles, so we use the resource safe Flowable.using(Callable resourceSupplier, Function> sourceSupplier, Consumer resourceDisposer) method call, where the last argument is a resource disposer.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 | public static Flowable<CSVRecord> readRecordsFromFile(Path inputFile,CSVFormat csvFormat) {
return Flowable.using(() -> Files.newBufferedReader(inputFile),
bufferedReader -> csvRecordFlowable(bufferedReader, csvFormat.withHeader()),
BufferedReader::close
);
}
private static Flowable<CSVRecord> csvRecordFlowable(BufferedReader br, CSVFormat csvFormat) {
try{
final CSVParser csvParser = new CSVParser(br,csvFormat);
return Flowable.fromIterable(() -> csvParser.iterator());
} catch (IOException e){
throw new RuntimeException(e);
}
}
|
This nicely sets up a Flowable which can then be processed in different ways and you get all the Flowable features like backpressure, etc. Example usage mentioned below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 | readRecordsFromFile(Paths.get("sample.csv")).parallel().
runOn(Schedulers.io()).filter(/*filter here*/).map(/*map here*/).sequential().subscribe(new Subscriber<CSVRecord>(){
Subscription sub = null;
@Override
public void onSubscribe(Subscription s){
sub = s;
}
@Override
public void onNext(CSVRecord record){
//do something here
//request next item
sub.request(1)
}
@Override
public void onError(Throwable t){
//handle error
}
@Override
public void onComplete(){
}
})
|