|
| 1 | +:batch-asciidoc: ./ |
| 2 | +:toc: left |
| 3 | +:toclevels: 4 |
| 4 | + |
| 5 | +[[processor]] |
| 6 | +== ItemProcessor |
| 7 | + |
| 8 | +ifndef::onlyonetoggle[] |
| 9 | +include::toggle.adoc[] |
| 10 | +endif::onlyonetoggle[] |
| 11 | + |
| 12 | +The `ItemReader` and `ItemWriter` interfaces are both very useful for their specific |
| 13 | +tasks, but what if you want to insert business logic before writing? One option for both |
| 14 | +reading and writing is to use the composite pattern: Create an `ItemWriter` that contains |
| 15 | +another `ItemWriter` or an `ItemReader` that contains another `ItemReader`. The following |
| 16 | +code shows an example: |
| 17 | + |
| 18 | +[source, java] |
| 19 | +---- |
| 20 | +public class CompositeItemWriter<T> implements ItemWriter<T> { |
| 21 | +
|
| 22 | + ItemWriter<T> itemWriter; |
| 23 | +
|
| 24 | + public CompositeItemWriter(ItemWriter<T> itemWriter) { |
| 25 | + this.itemWriter = itemWriter; |
| 26 | + } |
| 27 | +
|
| 28 | + public void write(List<? extends T> items) throws Exception { |
| 29 | + //Add business logic here |
| 30 | + itemWriter.write(items); |
| 31 | + } |
| 32 | +
|
| 33 | + public void setDelegate(ItemWriter<T> itemWriter){ |
| 34 | + this.itemWriter = itemWriter; |
| 35 | + } |
| 36 | +} |
| 37 | +---- |
| 38 | + |
| 39 | +The preceding class contains another `ItemWriter` to which it delegates after having |
| 40 | +provided some business logic. This pattern could easily be used for an `ItemReader` as |
| 41 | +well, perhaps to obtain more reference data based upon the input that was provided by the |
| 42 | +main `ItemReader`. It is also useful if you need to control the call to `write` yourself. |
| 43 | +However, if you only want to 'transform' the item passed in for writing before it is |
| 44 | +actually written, you need not `write` yourself. You can just modify the item. For this |
| 45 | +scenario, Spring Batch provides the `ItemProcessor` interface, as shown in the following |
| 46 | +interface definition: |
| 47 | + |
| 48 | +[source, java] |
| 49 | +---- |
| 50 | +public interface ItemProcessor<I, O> { |
| 51 | +
|
| 52 | + O process(I item) throws Exception; |
| 53 | +} |
| 54 | +---- |
| 55 | + |
| 56 | +An `ItemProcessor` is simple. Given one object, transform it and return another. The |
| 57 | +provided object may or may not be of the same type. The point is that business logic may |
| 58 | +be applied within the process, and it is completely up to the developer to create that |
| 59 | +logic. An `ItemProcessor` can be wired directly into a step. For example, assume an |
| 60 | +`ItemReader` provides a class of type `Foo` and that it needs to be converted to type `Bar` |
| 61 | +before being written out. The following example shows an `ItemProcessor` that performs |
| 62 | +the conversion: |
| 63 | + |
| 64 | +[source, java] |
| 65 | +---- |
| 66 | +public class Foo {} |
| 67 | +
|
| 68 | +public class Bar { |
| 69 | + public Bar(Foo foo) {} |
| 70 | +} |
| 71 | +
|
| 72 | +public class FooProcessor implements ItemProcessor<Foo, Bar> { |
| 73 | + public Bar process(Foo foo) throws Exception { |
| 74 | + //Perform simple transformation, convert a Foo to a Bar |
| 75 | + return new Bar(foo); |
| 76 | + } |
| 77 | +} |
| 78 | +
|
| 79 | +public class BarWriter implements ItemWriter<Bar> { |
| 80 | + public void write(List<? extends Bar> bars) throws Exception { |
| 81 | + //write bars |
| 82 | + } |
| 83 | +} |
| 84 | +---- |
| 85 | + |
| 86 | +In the preceding example, there is a class `Foo`, a class `Bar`, and a class |
| 87 | +`FooProcessor` that adheres to the `ItemProcessor` interface. The transformation is |
| 88 | +simple, but any type of transformation could be done here. The `BarWriter` writes `Bar` |
| 89 | +objects, throwing an exception if any other type is provided. Similarly, the |
| 90 | +`FooProcessor` throws an exception if anything but a `Foo` is provided. The |
| 91 | +`FooProcessor` can then be injected into a `Step`, as shown in the following example: |
| 92 | + |
| 93 | +.XML Configuration |
| 94 | +[source, xml, role="xmlContent"] |
| 95 | +---- |
| 96 | +<job id="ioSampleJob"> |
| 97 | + <step name="step1"> |
| 98 | + <tasklet> |
| 99 | + <chunk reader="fooReader" processor="fooProcessor" writer="barWriter" |
| 100 | + commit-interval="2"/> |
| 101 | + </tasklet> |
| 102 | + </step> |
| 103 | +</job> |
| 104 | +---- |
| 105 | + |
| 106 | +.Java Configuration |
| 107 | +[source, java, role="javaContent"] |
| 108 | +---- |
| 109 | +@Bean |
| 110 | +public Job ioSampleJob() { |
| 111 | + return this.jobBuilderFactory.get("ioSampleJob") |
| 112 | + .start(step1()) |
| 113 | + .end() |
| 114 | + .build(); |
| 115 | +} |
| 116 | +
|
| 117 | +@Bean |
| 118 | +public Step step1() { |
| 119 | + return this.stepBuilderFactory.get("step1") |
| 120 | + .<String, String>chunk(2) |
| 121 | + .reader(fooReader()) |
| 122 | + .processor(fooProcessor()) |
| 123 | + .writer(barWriter()) |
| 124 | + .build(); |
| 125 | +} |
| 126 | +---- |
| 127 | + |
| 128 | +A difference between `ItemProcessor` and `ItemReader` or `ItemWriter` is that an `ItemProcessor` |
| 129 | +is optional for a `Step`. |
| 130 | + |
| 131 | +[[chainingItemProcessors]] |
| 132 | +==== Chaining ItemProcessors |
| 133 | + |
| 134 | +Performing a single transformation is useful in many scenarios, but what if you want to |
| 135 | +'chain' together multiple `ItemProcessor` implementations? This can be accomplished using |
| 136 | +the composite pattern mentioned previously. To update the previous, single |
| 137 | +transformation, example, `Foo` is transformed to `Bar`, which is transformed to `Foobar` |
| 138 | +and written out, as shown in the following example: |
| 139 | + |
| 140 | +[source, java] |
| 141 | +---- |
| 142 | +public class Foo {} |
| 143 | +
|
| 144 | +public class Bar { |
| 145 | + public Bar(Foo foo) {} |
| 146 | +} |
| 147 | +
|
| 148 | +public class Foobar { |
| 149 | + public Foobar(Bar bar) {} |
| 150 | +} |
| 151 | +
|
| 152 | +public class FooProcessor implements ItemProcessor<Foo, Bar> { |
| 153 | + public Bar process(Foo foo) throws Exception { |
| 154 | + //Perform simple transformation, convert a Foo to a Bar |
| 155 | + return new Bar(foo); |
| 156 | + } |
| 157 | +} |
| 158 | +
|
| 159 | +public class BarProcessor implements ItemProcessor<Bar, Foobar> { |
| 160 | + public Foobar process(Bar bar) throws Exception { |
| 161 | + return new Foobar(bar); |
| 162 | + } |
| 163 | +} |
| 164 | +
|
| 165 | +public class FoobarWriter implements ItemWriter<Foobar>{ |
| 166 | + public void write(List<? extends Foobar> items) throws Exception { |
| 167 | + //write items |
| 168 | + } |
| 169 | +} |
| 170 | +---- |
| 171 | + |
| 172 | +A `FooProcessor` and a `BarProcessor` can be 'chained' together to give the resultant |
| 173 | +`Foobar`, as shown in the following example: |
| 174 | + |
| 175 | + |
| 176 | +[source, java] |
| 177 | +---- |
| 178 | +CompositeItemProcessor<Foo,Foobar> compositeProcessor = |
| 179 | + new CompositeItemProcessor<Foo,Foobar>(); |
| 180 | +List itemProcessors = new ArrayList(); |
| 181 | +itemProcessors.add(new FooProcessor()); |
| 182 | +itemProcessors.add(new BarProcessor()); |
| 183 | +compositeProcessor.setDelegates(itemProcessors); |
| 184 | +---- |
| 185 | + |
| 186 | +Just as with the previous example, the composite processor can be configured into the |
| 187 | +`Step`: |
| 188 | + |
| 189 | +.XML Configuration |
| 190 | +[source, xml, role="xmlContent"] |
| 191 | +---- |
| 192 | +<job id="ioSampleJob"> |
| 193 | + <step name="step1"> |
| 194 | + <tasklet> |
| 195 | + <chunk reader="fooReader" processor="compositeItemProcessor" writer="foobarWriter" |
| 196 | + commit-interval="2"/> |
| 197 | + </tasklet> |
| 198 | + </step> |
| 199 | +</job> |
| 200 | +
|
| 201 | +<bean id="compositeItemProcessor" |
| 202 | + class="org.springframework.batch.item.support.CompositeItemProcessor"> |
| 203 | + <property name="delegates"> |
| 204 | + <list> |
| 205 | + <bean class="..FooProcessor" /> |
| 206 | + <bean class="..BarProcessor" /> |
| 207 | + </list> |
| 208 | + </property> |
| 209 | +</bean> |
| 210 | +---- |
| 211 | + |
| 212 | +.Java Configuration |
| 213 | +[source, java, role="javaContent"] |
| 214 | +---- |
| 215 | +@Bean |
| 216 | +public Job ioSampleJob() { |
| 217 | + return this.jobBuilderFactory.get("ioSampleJob") |
| 218 | + .start(step1()) |
| 219 | + .end() |
| 220 | + .build(); |
| 221 | +} |
| 222 | +
|
| 223 | +@Bean |
| 224 | +public Step step1() { |
| 225 | + return this.stepBuilderFactory.get("step1") |
| 226 | + .<String, String>chunk(2) |
| 227 | + .reader(fooReader()) |
| 228 | + .processor(compositeProcessor()) |
| 229 | + .writer(foobarWriter()) |
| 230 | + .build(); |
| 231 | +} |
| 232 | +
|
| 233 | +@Bean |
| 234 | +public CompositeItemProcessor compositeProcessor() { |
| 235 | + List<ItemProcessor> delegates = new ArrayList<>(2); |
| 236 | + delegates.add(new FooProcessor()); |
| 237 | + delegates.add(new BarProcessor()); |
| 238 | +
|
| 239 | + CompositeItemProcessor processor = new CompositeItemProcessor(); |
| 240 | +
|
| 241 | + processor.setDelegates(delegates); |
| 242 | +
|
| 243 | + return processor; |
| 244 | +} |
| 245 | +---- |
| 246 | + |
| 247 | +[[filteringRecords]] |
| 248 | +==== Filtering Records |
| 249 | + |
| 250 | +One typical use for an item processor is to filter out records before they are passed to |
| 251 | +the `ItemWriter`. Filtering is an action distinct from skipping. Skipping indicates that |
| 252 | +a record is invalid, while filtering simply indicates that a record should not be |
| 253 | +written. |
| 254 | + |
| 255 | +For example, consider a batch job that reads a file containing three different types of |
| 256 | +records: records to insert, records to update, and records to delete. If record deletion |
| 257 | +is not supported by the system, then we would not want to send any "delete" records to |
| 258 | +the `ItemWriter`. But, since these records are not actually bad records, we would want to |
| 259 | +filter them out rather than skip them. As a result, the `ItemWriter` would receive only |
| 260 | +"insert" and "update" records. |
| 261 | + |
| 262 | +To filter a record, you can return `null` from the `ItemProcessor`. The framework detects |
| 263 | +that the result is `null` and avoids adding that item to the list of records delivered to |
| 264 | +the `ItemWriter`. As usual, an exception thrown from the `ItemProcessor` results in a |
| 265 | +skip. |
| 266 | + |
| 267 | +[[validatingInput]] |
| 268 | +==== Validating Input |
| 269 | + |
| 270 | +In the <<readersAndWriters.adoc#readersAndWriters,ItemReaders and ItemWriters>> chapter, multiple approaches to parsing input have been |
| 271 | +discussed. Each major implementation throws an exception if it is not 'well-formed'. The |
| 272 | +`FixedLengthTokenizer` throws an exception if a range of data is missing. Similarly, |
| 273 | +attempting to access an index in a `RowMapper` or `FieldSetMapper` that does not exist or |
| 274 | +is in a different format than the one expected causes an exception to be thrown. All of |
| 275 | +these types of exceptions are thrown before `read` returns. However, they do not address |
| 276 | +the issue of whether or not the returned item is valid. For example, if one of the fields |
| 277 | +is an age, it obviously cannot be negative. It may parse correctly, because it exists and |
| 278 | +is a number, but it does not cause an exception. Since there are already a plethora of |
| 279 | +validation frameworks, Spring Batch does not attempt to provide yet another. Rather, it |
| 280 | +provides a simple interface, called `Validator`, that can be implemented by any number of |
| 281 | +frameworks, as shown in the following interface definition: |
| 282 | + |
| 283 | +[source, java] |
| 284 | +---- |
| 285 | +public interface Validator<T> { |
| 286 | +
|
| 287 | + void validate(T value) throws ValidationException; |
| 288 | +
|
| 289 | +} |
| 290 | +---- |
| 291 | + |
| 292 | +The contract is that the `validate` method throws an exception if the object is invalid |
| 293 | +and returns normally if it is valid. Spring Batch provides an out of the box |
| 294 | +`ValidatingItemProcessor`, as shown in the following bean definition: |
| 295 | + |
| 296 | +.XML Configuration |
| 297 | +[source, xml, role="xmlContent"] |
| 298 | +---- |
| 299 | +<bean class="org.springframework.batch.item.validator.ValidatingItemProcessor"> |
| 300 | + <property name="validator" ref="validator" /> |
| 301 | +</bean> |
| 302 | +
|
| 303 | +<bean id="validator" class="org.springframework.batch.item.validator.SpringValidator"> |
| 304 | + <property name="validator"> |
| 305 | + <bean class="org.springframework.batch.sample.domain.trade.internal.validator.TradeValidator"/> |
| 306 | + </property> |
| 307 | +</bean> |
| 308 | +---- |
| 309 | + |
| 310 | +.Java Configuration |
| 311 | +[source, java, role="javaContent"] |
| 312 | +---- |
| 313 | +@Bean |
| 314 | +public ValidatingItemProcessor itemProcessor() { |
| 315 | + ValidatingItemProcessor processor = new ValidatingItemProcessor(); |
| 316 | +
|
| 317 | + processor.setValidator(validator()); |
| 318 | +
|
| 319 | + return processor; |
| 320 | +} |
| 321 | +
|
| 322 | +@Bean |
| 323 | +public SpringValidator validator() { |
| 324 | + SpringValidator validator = new SpringValidator(); |
| 325 | +
|
| 326 | + validator.setValidator(new TradeValidator()); |
| 327 | +
|
| 328 | + return validator; |
| 329 | +} |
| 330 | +---- |
| 331 | + |
| 332 | +You can also use the `BeanValidatingItemProcessor` to validate items annotated with |
| 333 | +the Bean Validation API (JSR-303) annotations. For example, given the following type `Person`: |
| 334 | + |
| 335 | +[source, java] |
| 336 | +---- |
| 337 | +class Person { |
| 338 | +
|
| 339 | + @NotEmpty |
| 340 | + private String name; |
| 341 | +
|
| 342 | + public Person(String name) { |
| 343 | + this.name = name; |
| 344 | + } |
| 345 | +
|
| 346 | + public String getName() { |
| 347 | + return name; |
| 348 | + } |
| 349 | +
|
| 350 | + public void setName(String name) { |
| 351 | + this.name = name; |
| 352 | + } |
| 353 | +
|
| 354 | +} |
| 355 | +---- |
| 356 | + |
| 357 | +you can validate items by declaring a `BeanValidatingItemProcessor` bean in your |
| 358 | +application context and register it as a processor in your chunk-oriented step: |
| 359 | + |
| 360 | +[source, java] |
| 361 | +---- |
| 362 | +@Bean |
| 363 | +public BeanValidatingItemProcessor<Person> beanValidatingItemProcessor() throws Exception { |
| 364 | + BeanValidatingItemProcessor<Person> beanValidatingItemProcessor = new BeanValidatingItemProcessor<>(); |
| 365 | + beanValidatingItemProcessor.setFilter(true); |
| 366 | +
|
| 367 | + return beanValidatingItemProcessor; |
| 368 | +} |
| 369 | +---- |
| 370 | + |
| 371 | +[[faultTolerant]] |
| 372 | +==== Fault Tolerance |
| 373 | + |
| 374 | +When a chunk is rolled back, items that have been cached during reading may be |
| 375 | +reprocessed. If a step is configured to be fault tolerant (typically by using skip or |
| 376 | +retry processing), any `ItemProcessor` used should be implemented in a way that is |
| 377 | +idempotent. Typically that would consist of performing no changes on the input item for |
| 378 | +the `ItemProcessor` and only updating the |
| 379 | +instance that is the result. |
0 commit comments