Spring Batch - Using an Itemwriter with List of Lists

Spring Batch - Using an ItemWriter with List of Lists

Typically, the design pattern is:

Reader -> reads something, returns ReadItem
Processor -> ingests ReadItem, returns ProcessedItem
Writer -> ingests List<ProcessedItem>

If your processor is returning List<Object>, then you need your Writer to expect List<List<Object>>.

You could do this by wrapping your JdbcBatchItemWriter as a delegate in an ItemWriter that looks something like this:

public class ListUnpackingItemWriter<T> implements ItemWriter<List<T>>, ItemStream, InitializingBean {

private ItemWriter<T> delegate;

@Override
public void write(final List<? extends List<T>> lists) throws Exception {
final List<T> consolidatedList = new ArrayList<>();
for (final List<T> list : lists) {
consolidatedList.addAll(list);
}
delegate.write(consolidatedList);
}

@Override
public void afterPropertiesSet() {
Assert.notNull(delegate, "You must set a delegate!");
}

@Override
public void open(ExecutionContext executionContext) {
if (delegate instanceof ItemStream) {
((ItemStream) delegate).open(executionContext);
}
}

@Override
public void update(ExecutionContext executionContext) {
if (delegate instanceof ItemStream) {
((ItemStream) delegate).update(executionContext);
}
}

@Override
public void close() {
if (delegate instanceof ItemStream) {
((ItemStream) delegate).close();
}
}

public void setDelegate(ItemWriter<T> delegate) {
this.delegate = delegate;
}

}

Writing List of Items using JdbcBatchItemWriter

You example is not correct. You are creating a JpaItemWriter in the write method, so a new instance is created on each call to write. This is probably the cause of your performance issue.

More importantly, lifecycle methods of the delegate writer (open/update/close) will not be honored (it is not the case for JpaItemWriter which does not implement ItemStream but this would be a problem if the delegate is an item stream). Your MyItemWriter implementation should be something like:

public class MyItemWriter implements ItemWriter<List<MyDomainObject>> {

private JpaItemWriter jpaItemWriter;

public MyItemWriter(JpaItemWriter jpaItemWriter) {
this. jpaItemWriter = jpaItemWriter;
}

@Override
public void write(List<? extends Lists<MyDomainObject>> items) {
for(List<MyDomainObject> o : items) {
this. jpaItemWriter.write(o);
}
}
}

Now if you want to use the JdbcBatchItemWriter to write a list of lists, see Spring Batch - Using an ItemWriter with List of Lists.

Edit: Added a sample code of how to set the delegate as requested in comments:

@Bean
public ListUnpackingItemWriter<T> itemWriter() {
JdbcBatchItemWriter<T> jdbcBatchItemWriter = null; // configure your jdbcBatchItemWriter
ListUnpackingItemWriter<T> listUnpackingItemWriter = new ListUnpackingItemWriter<>();
listUnpackingItemWriter.setDelegate(jdbcBatchItemWriter);
return listUnpackingItemWriter;
}

Spring Batch 3 Custom Writer with a List

Your writer would stay as is but be wrapped by the ListUnpackingItemWriter.

// NOTE: this will now be the delegate used by the delegating writer.
// It will be injected to the delegating writer.
@Bean(name="MyDelegateWriter")
@SphynxBatchStepScope
public FlatFileItemWriter<Menu> myWriter(String fileOutputName) {

FlatFileItemWriter<Menu> writer = new FlatFileItemWriter<Menu>();
writer.setResource(new FileSystemResource("test.txt")));
DelimitedLineAggregator<Menu> delLineAgg = new DelimitedLineAggregator<Menu>();
delLineAgg.setDelimiter(" ");
BeanWrapperFieldExtractor<Menu> fieldExtractor = new BeanWrapperFieldExtractor<Menu>();
fieldExtractor.setNames(new String[] {"id", "name", "telephone"});
delLineAgg.setFieldExtractor(fieldExtractor);
writer.setLineAggregator(delLineAgg);
writer.setHeaderCallback(header());
writer.setFooterCallback(footer());
writer.setEncoding("UTF-8");

return writer;
}

// NOTE: now this "MyWriter" will be used by your job
@Bean(name="MyWriter")
@SphynxBatchStepScope
public ListUnpackingItemWriter(@Qualifier("MyDelegateWriter") FlatFileItemWriter<Menu> delegate) {
ListUnpackingItemWriter<List<Menu>> writer = new ListUnpackingItemWriter<>();
writer.setDelegate(delegate);
return writer;
}

Spring boot batch custom ItemWriter with a list of items

In chunk based batch processing, reader and processor reads/processes single record at a time, keeps it in buffer and sends all records to writer after chunk size has met. If you notice signature of ItemWriter, it is List<? extends T> items, which means it accepts list of items returned by processor. In your case, your processor is returning List<ProcessedItem> so your writer would expect List<List<ProcessedItem>>. To fix your issue, change MyCustomWriter's write method to below

@Override
public void write(List<List<ProcessedItem> items) throws Exception {

items.stream().flatMap(Collection::stream).forEach(data -> { //data is ProcessedItem instance
// process data
});
}

What is the best way to flatten a list of lists in Spring Batch?

The processor can return the List just fine, but supposing I want to work with the elements of this list as individuals in subsequent processors. Am I expected to write them to the database first?

No need to write them to the database first, that would be inefficient. Encapsulation is your friend here, you can wrap the result of your processor in an aggregate type that can be handed to subsequent processors in the chain (using a composite processor for instance). The item writer is then responsible for doing the flat map operation to unwrap fully processed items from the aggregate type before writing them.

Another technique is to use two concurrent steps with a staging area (where you would flatten items) as described in issue #2044. I implemented a PoC here with a blocking queue as staging area. In your case, the first step would process items and write the results in the queue, and the second step can read (flat) items from the queue, enrich them as necessary and write them where appropriate.



Related Topics



Leave a reply



Submit