第十四章. 工作流中使用 Quartz
Quartz 可以执行一个难以置信的 Job,来完成预计的任务。不幸的是,用来运行一个业务的 Job 经常比单一的 Job 或任务要稍稍复杂。每年百万计的美金花费到理解、设计和构建组织的业务流程。Quartz 框架包含一些设施用于把多个 Job 链接起来构建一个简单的业务流程模型 。本章讨论你能如何用 Quartz 连接 Job。为获得实际的工作流可操作性,你还需要一些来自于 Quartz 框架的东西。本章就来看为实现你的 Job 所构成的工作流可以如何扩展 Quartz 框架。
一. 什么是工作流
Web 上聚集了个人或团体关于工作流的定义和实例。有人定义工作流为“自动化的后台管理系统”。另一些人使用“业务流程建模” 一语,并收取许多的咨询费用向你解释这个概念。对于本章的要义,我们使用如下的工作流定义:
工作流是出现在某一特定时序中的一系列互为依懒的任务。
在我们进一步深入到本章内容后,该定义将变得越发明淅。
·工作流中什么地方需要 Quartz ?
如果你问,“工作流中什么地方要使用 Quartz 来效力?” 答案就是,“相当的多。” 甚至是像自动执行构建或只是发送电子邮件的简单任务里,都有工作流的位置。Quartz 支持一些基本的途径来把多个 Job 串起来。本章就讨论那些并展示如何集成 Quartz 到一个流行的开源的工作流解决方案。
二. Quartz 中的 Job 串联
Job 串联这一主题是在 Quartz 用户论坛中随着时间的推移而出现的。实际上,问的人多了,它已成为 Quartz FAQ (见 http://www.opensymphony.com/quartz/faq.html#chaining) 的一部分了。无论他们是否实现了他,多数问询了 Quartz 是否支持 Job 串联的 用户实际上问的是,“我该如何把工作流加到 Quartz 中?” 但是在我们深入到 OSWorkflow 之前,让我们瞧瞧使用 Quartz 框架自带的东西你该如何完成 Job 的串联。
·Job 串联不是工作流
为明确起见,我们应该认清:Job 串联是指 Quartz Job 在前一个 Job 完成时有条件或无条件的部署另一个 Job。单独使用 Quartz 框架来达成这一过程会受困于一些问题和限制。然而,还是值得去练习一下,以便于你能充分了解那些限制。
接下来的陈述我们将使你们中的某些人要抓狂了,但是你会从这份材料中获息,Quartz 中的 Job 串联并非工作流。它或许和工作流有些类似,有着工作流的气味让人感觉到像是工作流,但它确确实实不是工作流,你很快就会知道这一点的。你可以认为这是“懒汉式工作流”,在小额预算时权作一种工作流类型。严格意义上,像 OSWorkflow 那样的工作流系统提供了比你能从 Quartz 的 Job 串联获得的更多的功能。这不是在打击 Quartz:Quartz 是为 Job 调度设计的,它也做得非常好。工作流框架,像 OSWorkflow 是设计来实现工作流程的。两者都是伟大的工作。
·用监听器实现 Job 串联
第一种实现 Job 串联的方法是使用 Quartz 监听器。这是通过创建一个 JobListener 或 TriggerListener 来完成,当它们被 Scheduler 通知时,就为本次执行部署上下一个 Job。JobListener 的 jobWasExecuted() 方法,或 TriggerListener 的 TriggerComplete() 是用来“链接”到下一个 Job 的地方。我们假定,你有一个执行一些重要业务逻辑的 Job,叫做 ImportantJob。你就像先前创建其他任何的 Quartz Job 那样创建它。代码 14.1 显示了这个 Job 的概要,描述了你的 Quartz 应用需要执行某一重要的 Job。
代码 14.1. ImportantJob,描述了你可能为你的业务所要执行的 Job
1 2 3 4 5 6 |
public class ImportantJob implements Job { public void execute(JobExecutionContext context) { // Do something important in this Job } } |
注意到代码 14.1 中的 Job 什么也没指定。让我们进一步假如你需要在代码 14.1 中所示的 ImportantJob 结束时链接到第二个 Job。你可以选择任何 Job,不过我们让这个 Job 打印出之前运行的 Job 的一些细节信息。把它叫做 PrintJobResultJob (见 代码 14.2)。
代码 14.2. PrintJobResultJob 打印有关之前运行的被链接的 Job 的信息
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 |
public class PrintJobResultJob implements Job { Log logger = LogFactory.getLog(PrintJobResultJob.class); public void execute(JobExecutionContext context) { // Get the JobResult for the previous chained Job JobResult jobResult = (JobResult) context.getJobDataMap().get("JOB_RESULT"); // If no Job was chained before this one, do nothing if (jobResult != null) { logger.info(jobResult); } } } |
PrintJobResultJob 设计为察看它的 JobDataMap 来确定是否存在一个 JobResult 对象。类 JobResult 不是 Quartz 框架的组成部分,但是你能不费力的创建它来表示 Job 的执行结果。在许多实例中,创建一个像 JobResult 的类是很有帮助的。代码 14.3 显示了我们例子中的 JobResult 类。
代码 14.3. JobResult 表示 Job 的执行结果
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 |
public class JobResult { private boolean success; private String jobName; private long startedTime; private long finishedTime; public JobResult(){ startedTime = System.currentTimeMillis(); } // getters and setters not shown in this listing public String toString() { StringBuffer buf = new StringBuffer(); buf.append(jobName); buf.append(" executed in "); buf.append(finishedTime - startedTime); buf.append(" (msecs) "); if (success) { buf.append("and was successful. "); } else { buf.append("but was NOT successful. "); } return buf.toString(); } } |
代码 14.3 中 JobResult 类包含了有关 Job 执行结果的几块信息:启动时间,完成时间,和指示是否执行成功的标志。显然,你也可以在你的版本中加任何需要的字段;这个只是一个简单的例子。
在这个 Job 串联例子的下一步是创建一个监听器类来执行实际的串联操作。对于本例中,我们将用一个 JobListener,只要一个 TriggerListener 就可很好的工作。代码 14.5 显示了这个 Job 串联的 JobListener.
代码 14.5. 一个执行 Job 串联的非全局 JobListener
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 |
public class JobChainListener implements org.quartz.JobListener { Log logger = LogFactory.getLog(JobChainListener.class); public Class nextJobClass; public String listenerName; public JobChainListener() { super(); } public JobChainListener(String listenerName, Class nextJob) { setName(listenerName); this.nextJobClass = nextJob; } public String getName() { return listenerName; } public void setName(String name) { this.listenerName = name; } public void jobToBeExecuted(JobExecutionContext context) { // Do nothing in this example } public void jobExecutionVetoed(JobExecutionContext context) { // Do nothing in this example } public void jobWasExecuted(JobExecutionContext context, JobExecutionException jobException) { Scheduler scheduler = context.getScheduler(); try { // Create the chained JobDetail JobDetail jobDetail = new JobDetail("ChainedJob", null, nextJobClass); // Create a one-time trigger that fires immediately Trigger trigger = TriggerUtils.makeSecondlyTrigger(0, 0); trigger.setName("FireNowTrigger"); trigger.setStartTime(new Date()); // Update the JobResult for the next Job JobResult jobResult = (JobResult) context.getJobDataMap().get("JOB_RESULT"); jobResult.setFinishedTime(System.currentTimeMillis()); jobResult.setSuccess(true); // Pass JobResult to next job through its JobDataMap jobDetail.getJobDataMap().put("JOB_RESULT", jobResult); // Schedule the next job to fire immediately scheduler.scheduleJob(jobDetail, trigger); logger.info(nextJobClass.getName() + " has been scheduled executed"); } catch (Exception ex) { logger.error("Couldn't chain next Job", ex); return; } } } |
如你从代码 14.5 中看到的,链接到下一个 Job 的工作是在 jobWasExecuted() 方法中完成。你已从第七章,“实现 Quartz 监听器” 学习到,Scheduler 在 Job 完成执行时调用 jobWasExecuted() 方法的。使得这是把 Job 串链在一起的最佳方法。在代码 14.5 的 jobWasExecuted() 方法中,进行了几件事情。
首先,为被链接 Job 新建了一个 JobDetail 和 Trigger。接着从 JobDataMap 中获取了第一个 Job 的 JobResult,并设置了 finishedTime 和 success 字段的值。为了让被链接的(下一个) Job 能访问到 JobResult 对象,它被载入到被链接 Job 的 JobDataMap 中
传递数据从一个 Job 到另一个JobResult 的想法用在这个例子中描绘了,尽管是可以传递数据从一个 Job 到被链接的的 Job,但还是有些笨拙的。在当前的例子中,数据是在监听器来传递的。这是可发生串联操作唯一的地方。假如你需要串联三个 Job,这种纠缠关系就会变得很糟。(译者 Unmi 注:不知所云) |
代码 14.5 的最后部分部署了一个新的 Job 到 Scheduler 上。因为新 Job 的 Trigger 设置为立刻触发,PrintJobResultJob 将会立即执行。回看代码 14.2,你会发现当 PrintJobResultJob 类的 execute() 方法被执行时,它会获取到 JobResult 对象,并调用它的 toString() 方法。再次说一下,这是一个向你展示如何串联 Job 的非常简单的例子。你的 Job 显然会有比这更复杂的逻辑。然而 Job 串联工作是一样的。
因此,好消息是问题不在于使用监听器类来串联 Job 的难度上。坏消息是这种用法存在有几个严重的设计问题。首先,尽管我们足够的聪明把下一 Job 的名字传到了监听器类中,但这段代码有着相当紧的耦合性。监听器需要在创建之时被告知链接的 Job,所以监听在它的生命期内变得与某一特定被链接 Job 紧密的耦合起来了。你需要为每一个被链接的 Job 分别建立监听器,那么对于需要链接到多余两个 Job 时又该如何呢?事情很快就变得不好控制了。
监听器用法的变化 当然,存在一些不同的方式让你能实现你的监听器使之工作。例如,你可以创建一个独立的 JobDetail 实例,设置它的 durability 标志为 true,并预存储到 Scheduler 中。这样的话,监听器就不需要每次都创建这个 JobDetail 和相应的 Trigger;只需要创建一个 JobDataMap,把 JobResult 置于其中,并调用 scheduler.triggerJob(jobName, groupName, jobDataMap);这个已存在的 Job 就会被执行并传递 JobResult。 |
·使用 JobDataMap 进行 Job 串联
另一串联 Job 的方法是使用 JobDataMap 来存储下一个要执行的 Job。在早先监听器的例子中,我们使用了 JobDataMap 来存储和传递 JobResult,但是这一新用法可以摈弃监听器而用 JobDataMap 来完成这一切。
因为可以让我们摆脱监听器类了,这就意味着 Job 必须自己处理链接到下一 Job。需要的话,这个行为可以抽象到一个基础 Job 类中。不过,这儿的例子为保持尽可能的简单,所以不那么做。
代码 14.5中展示了新的 ImportantJob 类。在它完成之后,链中的下一 Job 就得到部署了。你可能还想基于某个标志或是上次执行的条件来决定部署下一 Job。例如,假如某标志设置为 true 时,你要执行 Job A;而在该标志为 false,你会要执行 Job B。
链中的下一 Job 由你选择一个 Key 存键在 JobDataMap 中。在本例中,我们使用 NEXT_JOB。这种用法的其中一个问题是必须在 Job 执行之前把下一 Job 存入到 JobDataMap 中。
代码 14.5. Job 串联也能用 JobDataMap 实现
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 |
public class ImportantJob implements Job { static Log logger = LogFactory.getLog(JobChainListener.class); public void execute(JobExecutionContext context) { // Do something important in this Job // Set some condition based on this Job execution boolean success = true; // schedule the next Job if condition was successful if (success) { scheduleNextJob(context); } else { logger.info("Job was NOT chained"); } } protected void scheduleNextJob(JobExecutionContext context) { JobDataMap jobDataMap = context.getJobDataMap(); String nextJob = jobDataMap.getString("NEXT_JOB"); if (nextJob!= null && nextJob.length() > 0) { try { Class jobClass = Class.forName(nextJob); scheduleJob(jobClass, context.getScheduler()); } catch (Exception ex) { logger.error("error scheduling chained job", ex); } } } protected void scheduleJob(Class jobClass, Scheduler scheduler) { JobDetail jobDetail = new JobDetail(jobClass.getName(), null, jobClass); // Create a fire now, one time trigger Trigger trigger = TriggerUtils.makeSecondlyTrigger(0, 0); trigger.setName(jobClass.getName() + "Trigger"); trigger.setStartTime(new Date()); // Schedule the next job to fire immediately try { scheduler.scheduleJob(jobDetail, trigger); } catch (SchedulerException ex) { logger.error("error chaining Job " + jobClass.getName(), ex); } } } |
从 JobDataMap 中获取到 Job 的的名称之后,创建了 JobDetail 和 Trigger,并部署了 Job。和 和前面监听器的例子一样,部署首个 Job 的代码需要加入最初被链接的 Job 到 JobDataMap 来让一切正常工作。这能在 Job 首次加到 Scheduler 和 Scheduler 启动之后来做。代码 14.6 展示了这些。
代码 14.6. 第一个被链接的 Job 需要在第一个 Job 被部署时配置
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 |
public class NewScheduler { static Log logger = LogFactory.getLog(NewScheduler.class); public static void main(String[] args) { try { // Create and start the Scheduler Scheduler scheduler = StdSchedulerFactory.getDefaultScheduler(); scheduler.start(); JobDetail jobDetail = new JobDetail("ImportantJob", null, ImportantJob.class); // Set up the first chained Job JobDataMap dataMap = jobDetail.getJobDataMap(); dataMap.put("NEXT_JOB", "org.cavaness.quartzbook.chapter14.ChainedJob"); // Create the trigger and scheduler the Job Trigger trigger = TriggerUtils.makeSecondlyTrigger(10000, 0); trigger.setName("FireOnceTrigger"); trigger.setStartTime(new Date()); scheduler.scheduleJob(jobDetail, trigger); } catch (SchedulerException ex) { logger.error( ex ); } } } |
14.6 中的代码是用来部署第一个 Job,并且同时设置了链中的下一个 Job。如果有第三个 Job,它就不得不在第二个 Job 的 JobDataMap 中设置了。你很明显能看出在链中当有多余两个 Job 时会有多笨拙。OSWorkflow 能帮上忙了,还能解决其他一些乱糟糟的问题。
·为 Job 串联使用 JobInitializationPlugin
如果你在 quartz_jobs.xml 文件中指明你的 Job 信息,并使用 JobInitializationPlugin 来加载那些信息。这种用法也许不那么的坏。那是因为你可以很容易的在 XML 文件中指定 Job 链。例如,就看代码 14.7 中的 quartz_jobs.xml。
代码 14.7. 使用 JobInitializationPlugin 时 Job 串联还有了一定程度的可理性
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 |
<?xml version='1.0' encoding='utf-8'?> <quartz> <job> <job-detail> <name>ImportantJob</name> <job-class> org.cavaness.quartzbook.chapter14.ImportantJob </job-class> <job-data-map allows-transient-data="true"> <entry> <key>NEXT_JOB</key> <value>org.cavaness.quartzbook.chapter14.ChainedJob</value> </entry> </job-data-map> </job-detail> <trigger> <simple> <name>FireOnceTrigger</name> <group>DEFAULT</group> <job-name>ImportantJob</job-name> <job-group>DEFAULT</job-group> <start-time>2005-07-19 8:31:00 PM</start-time> <repeat-count>0</repeat-count> </simple> </trigger> </job> </quartz> |
在代码 14.7 中 Job 信息的结果和前的例子是一样的,还更好些。如果你需要改变哪个 Job 会链接到 ImportatJob 的话,你仅需要修改这个 XML 文件。在前面的 Job 串联的例子中,不得不修改代码然后重新编译。
[译者 Unmi 注:] 本部分作者想要表达的内容确如他之前有交待的那样,会让人觉得有些抓狂。从前面各章节一路下来,唯见本部分有些描述让人难以琢磨。请读者朋友结合实际用意仔细理解。另:对 job-chaining 使用的是 Job 串联的说法,自我感觉还没有到位,但还没有想到更好的说法。单独的 chain 用的链(接)。
本文链接 https://yanbin.blog/quartz-job-scheduling-framework-14-1/, 来自 隔叶黄莺 Yanbin Blog
[版权声明] 本文采用 署名-非商业性使用-相同方式共享 4.0 国际 (CC BY-NC-SA 4.0) 进行许可。
使用chain时,如果chain里需要执行两次myjob,但是两次执行需要的参数值不同,如何处理呢,jobdatamap感觉是和job类绑定的