Java EE 7: An Overview of Batch Processing
Batch processing is used in many industries for tasks
ranging from payroll processing; statement generation; end-of-day
jobs such as interest calculation and ETL (extract, load, and
transform) in a data warehouse; and many more. Typically, batch
processing is bulk-oriented, non-interactive, and long running—and
might be data- or computation-intensive. Batch jobs can be run on
schedule or initiated on demand. Also, since batch jobs are
typically long-running jobs, check-pointing and restarting are
common features found in batch jobs.
JSR
352 (Batch Processing for Java Platform), part of the recently
introduced Java EE 7 platform, defines the programming model for
batch applications plus a runtime to run and manage batch jobs.
This article covers some of the key concepts including feature
highlights, an overview of selected APIs, the structure of Job
Specification Language, and a sample batch application. The article
also describes how you can run batch applications using GlassFish Server Open Source Edition
4.0.
Batch Processing Architecture
This section and Figure 1 describe
the basic components of the batch processing architecture.
- A job encapsulates the entire batch process. A job contains one or more steps. A job is put together using a Job Specification Language (JSL) that specifies the sequence in which the steps must be executed. In JSR 352, JSL is specified in an XML file called the job XML file. In short, a job (with JSR 352) is basically a container for steps.
- A step is a domain object that encapsulates
an independent, sequential phase of the job. A step contains all
the necessary logic and data to perform the actual processing. The
batch specification deliberately leaves the definition of a step
vague because the content of a step is purely application-specific
and can be as complex or simple as the developer desires. There are
two kinds of steps: chunk and batchlet.
- A chunk-style step contains exactly
one
ItemReader
, oneItemProcessor
, and oneItemWriter
. In this pattern,ItemReader
reads one item at a time,ItemProcessor
processes the item based upon the business logic (such as "calculate account balance"), and hands it to the batch runtime for aggregation. Once the "chunk-size" number of items are read and processed, they are given to anItemWriter
, which writes the data (for example, to a database table or a flat file). The transaction is then committed. - JSR 352 also defines a roll-your-own kind of a step called a batchlet. A batchlet is free to use anything to accomplish the step, such as sending an e-mail.
- A chunk-style step contains exactly
one
JobOperator
provides an interface to manage all aspects of job processing, including operational commands, such as start, restart, and stop, as well as job repository commands, such as retrieval of job and step executions. See section 10.4 of the JSR 352 specification for more details aboutJobOperator
.JobRepository
holds information about jobs currently running and jobs that ran in the past.JobOperator
provides APIs to access this repository. AJobRepository
could be implemented using, say, a database or a file system.
Developing a simple Payroll processing application
This article demonstrates some of the key features of JSR 352 using a simple payroll processing application. The application has been intentionally kept quite simple in order to focus on the key concepts of JSR 352.The
SimplePayrollJob
batch job involves
reading input data for payroll processing from a comma-separated
values (CSV) file. Each line in the file contains an employee ID
and the base salary (per month) for one employee. The batch job
then calculates the tax to be withheld, the bonus, and the net
salary. The job finally needs to write out the processed payroll
records into a database table.We use a CSV file in this example just to demonstrate that JSR 352 allows batch applications to read and write from any arbitrary source.
Job specification language for the Payroll processing application
We discussed that a step is a domain object that encapsulates an independent, sequential phase of the job, and a job is basically a container for one or more steps.In JSR 352, a JSL basically specifies the order in which steps must be executed to accomplish the job. The JSL is powerful enough to allow conditional execution of steps, and it also allows each step to have its own properties, listeners, and so on.
A batch application can have as many JSLs as it wants, thus allowing it to start as many batch jobs as required. For example, an application can have two JSLs, one for payroll processing and another for report generation. Each JSL must be named uniquely and must be placed in the
META-INF/batch-jobs
directory.
Subdirectories under META-INF/batch-jobs
are
ignored.Our JSL for payroll processing is placed in a file called
SimplePayrollJob.xm
l and looks like
Listing 1:<job id="SimplePayrollJob" xmlns=http://xmlns.jcp.org/xml/ns/javaee version="1.0"> <step id="process"> <chunk item-count="2"> <reader ref="simpleItemReader/> <processor ref="simpleItemProcessor/> <writer ref="simpleItemWriter/> </chunk> </step> </job>
Our
SimplePayrollJob
batch job has just
one step (called "process"). It is a chunk-style step and has (as
required for a chunk-style step), an ItemReader
,
an ItemProcessor
, and
an ItemWriter
. The implementations
for ItemReader
, ItemProcessor
,
andItemWriter
for this step are specified using
the ref
attribute in
the <reader>
, <processor>
,
and <writer>
elements.When the job is submitted (we will see later how to submit batch jobs), the batch runtime starts with the first step in the JSL and walks its way through until the entire job is completed or one of the steps fails. The JSL is powerful enough to allow both conditional steps and parallel execution of steps, but we will not cover those details in this article.
The
item-count
attribute, which is
defined as 2
in Listing 1,
defines the chunk size of the chunk.Here is a high-level overview of how chunk-style steps are executed. Please see section "Regular Chunk Processing" of the JSR 352 specification for more details.
- Start a transaction.
- Invoke the
ItemReader
and pass the item read by theItemReader
to theItemProcessor
.ItemProcessor
processes the item and returns the processed item to the batch runtime. - The batch runtime repeats Step
2
item-count
times and maintains a list of processed items. - The batch runtime invokes
the
ItemWriter
that writesitem-count
number of processed items. - If exceptions are thrown
from
ItemReader
,ItemProcessor
, orItemWriter
, the transaction fails and the step is marked as "FAILED." Please refer to Section 5.2.1.2.1 ("Skipping Exceptions") in the JSR 352 specification. - If there are no exceptions, the batch runtime obtains
checkpoint data
from
ItemReader
andItemWriter
(see section 2.5 in the JSR 352 specification for more details). The batch runtime commits the transaction. - Steps 1 through 6 are repeated if
the
ItemReader
has more data to read.
ItemWriter
will
write out two records per transaction.Writing the ItemReader, ItemProcessor and ItemWriter
Writing the ItemReader
Our payroll processing batch JSL defines a single chunk style step and specifies that the step uses anItemReader
namedsimpleItemReader
.
Our application contains an implementation
of ItemReader
to read input CSV data.
Listing 2 shows a snippet of
our ItemReader
:@Named public class SimpleItemReader extends AbstractItemReader { @Inject private JobContext jobContext; ... }
Note that the class is annotated with the
@Named
annotation. Because
the @Named
annotation uses the default
value, the Contexts and Dependency Injection (CDI) name for this
bean is simpleItemReader
. The JSL specifies the
CDI name of the ItemReader
in
the<reader>
element. This allows the batch
runtime to instantiate (through CDI)
our ItemReader
when the step is
executed.Our
ItemReader
also injects
a JobContext
. JobContext
allows
the batch artifact (ItemReader
, in this case) to read
values that were passed during job submission.Our payroll
SimpleItemReader
overrides
the open()
method to open the input from
which payroll input data is read. As we shall see later, the
parameter prevCheckpointInf
o will not be null if
the job is being restarted.In our example, the
open()
method, which
is shown in Listing 3, opens the payroll input
file (which has been packaged along with the application).public void open(Serializable prevCheckpointInfo) throws Exception { JobOperator jobOperator = BatchRuntime.getJobOperator(); Properties jobParameters = jobOperator.getParameters(jobContext.getExecutionId()); String resourceName = (String) jobParameters.get("payrollInputDataFileName"); inputStream = new FileInputStream(resourceName); br = new BufferedReader(new InputStreamReader(inputStream)); if (prevCheckpointInfo != null) recordNumber = (Integer) prevCheckpointInfo; for (int i=1; i<recordNumber; i++) { //Skip upto recordNumber br.readLine(); } System.out.println("[SimpleItemReader] Opened Payroll file for reading from record number: " + recordNumber); }
The
readItem()
method
basically reads one line of data from the input file and determines
whether the line contains two integers (one for employee ID and one
for base salary). If there are two integers, it creates and returns
a new instance
of PayrollInputRecord
and
returns to the batch runtime (which is then passed
to ItemWriter
).Listing 4
public Object readItem() throws Exception { Object record = null; if (line != null) { String[] fields = line.split("[, \t\r\n]+"); PayrollInputRecord payrollInputRecord = new PayrollInputRecord(); payrollInputRecord.setId(Integer.parseInt(fields[0])); payrollInputRecord.setBaseSalary(Integer.parseInt(fields[1])); record = payrollInputRecord; //Now that we could successfully read, Increment the record number recordNumber++; } return record; }
The method
checkpointInfo()
is called by
the batch runtime at the end of every successful chunk transaction.
This allows the Reader to check point the last successful read
position.In our example, the
checkpointInfo()
returns
the recordNumber
indicating the number of
records that have been read successfully, as shown in
Listing 5.@Override public Serializable checkpointInfo() throws Exception { return recordNumber; }
Writing the ItemProcessor
Our SimpleItemProcessor
follows a
pattern similar to the pattern
for SimpleItemReader
.
The processItem()
method receives (from
the batch runtime) the PayrollInputRecord
. It
then calculates the tax and net and returns
a PayrollRecord
as output. Notice in
Listing 6 that the type of object returned by
an ItemProcessor
can be very different from
the type of object it received
from ItemReader
.@Named public class SimpleItemProcessor implements ItemProcessor { @Inject private JobContext jobContext; public Object processItem(Object obj) throws Exception { PayrollInputRecord inputRecord = (PayrollInputRecord) obj; PayrollRecord payrollRecord = new PayrollRecord(); int base = inputRecord.getBaseSalary(); float tax = base * 27 / 100.0f; float bonus = base * 15 / 100.0f; payrollRecord.setEmpID(inputRecord.getId()); payrollRecord.setBase(base); payrollRecord.setTax(tax); payrollRecord.setBonus(bonus); payrollRecord.setNet(base + bonus - tax); return payrollRecord; } }
Writing the ItemWriter
By now,SimpleItemWriter
must be
following predictable lines for you.The only difference is that it injects an
EntityManager
so that it can persist
the PayrollRecord
instances (which are JPA
entities) into a database, as shown in Listing
7.@Named public class SimpleItemWriter extends AbstractItemWriter { @PersistenceContext EntityManager em; public void writeItems(List list) throws Exception { for (Object obj : list) { System.out.println("PayrollRecord: " + obj); em.persist(obj); } } }The
writeItems()
method persists all
the PayrollRecord
instances into a database
table using JPA. There will be at
most item-count
entries (the chunk size) in
the list.Now that we have our JSL,
ItemReader
, ItemProcessor
,
and ItemWriter
ready, let's see how a batch
job can be submitted.Starting a batch job from a servlet
Note that the mere presence of a job XML file or other batch artifacts (such asItemReader
) doesn't mean that
a batch job is automatically started when the application is
deployed. A batch job must be initiated explicitly, say, from a
servlet or from an Enterprise JavaBeans (EJB) timer or an EJB
business method.In our payroll application, we use a servlet (named
PayrollJobSubmitterServlet
) to submit a
batch job. The servlet displays an HTML page that presents to the
user a form containing two buttons. When the first button,
labeled Calculate Payroll, is clicked, the
servlet invokes the startNewBatchJob
method,
shown in Listing 8, which starts a new batch
job.private long startNewBatchJob() throws Exception { JobOperator jobOperator = BatchRuntime.getJobOperator(); Properties props = new Properties(); props.setProperty("payrollInputDataFileName", payrollInputDataFileName); return jobOperator.start(JOB_NAME, props); }
The first step is to obtain an instance of
JobOperator
. This can be done by calling the
following:JobOperator jobOperator = BatchRuntime.getJobOperator();
The servlet then creates a
Properties
object and stores the input
file name in it. Finally, a new batch job is started by calling the
following:jobOperator.start(jobName, properties)
The
jobname
is nothing but the job JSL
XML file name (minus the .xml
extension).
The properties
parameter serves to pass any
input data to the job. The Properties
object
(containing the name of the payroll input file) is made available
to other batch artifacts (such
as ItemReader
, ItemProcessor
,
and so on) through
the JobContext
interface.The batch runtime assigns a unique ID, called the execution ID, to identify each execution of a job whether it is a freshly submitted job or a restarted job. Many of the
JobOperator
methods take the execution
ID as parameter. Using the execution ID, a program can obtain the
current (and past) execution status and other statistics about the
job. The JobOperator.start()
method returns
the execution ID of the job that was started.Retrieving details about batch jobs
When a batch job is submitted, the batch runtime creates an instance ofJobExecution
to track
it. JobExecution
has methods to obtain
various details such as the job start time, job completion time,
job exit status, and so on. To obtain
the JobExecution
for an execution ID, you
can use
the JobOperator.getJobExecution(executionId)
method.
Listing 9 shows the definition
of JobExecution
:package javax.batch.runtime; public interface JobExecution { long getExecutionId(); java.lang.String getJobName(); javax.batch.runtime.BatchStatus getBatchStatus(); java.util.Date getStartTime(); java.util.Date getEndTime(); java.lang.String getExitStatus(); java.util.Date getCreateTime(); java.util.Date getLastUpdatedTime(); java.util.Properties getJobParameters(); }
0 comments:
Post a Comment