多线程下载由来已久,如 FlashGet、NetAnts 等工具,它们都是依懒于 HTTP 协议的支持(Range 字段指定请求内容范围),首先能读取出请求内容 (即欲下载的文件) 的大小,划分出若干区块,把区块分段分发给每个线程去下载,线程从本段起始处下载数据及至段尾,多个线程下载的内容最终会写入到同一个文件中。
只研究有用的,工作中的需求:要把多个任务分派给多个线程去执行,这其中就会有一个任务列表指派到线程的策略思考:已知:1. 一个待执行的任务列表,2. 指定要启动的线程数;问题是:每个线程实际要执行哪些任务。
策略是:任务列表连续按线程数分段,先保证每线程平均能分配到的任务数,余下的任务从前至后依次附加到线程中--只是数量上,实际每个线程执行的任务都还是连续的。如果出现那种僧多(线程) 粥(任务) 少的情况,实际启动的线程数就等于任务数,一挑一。这里只实现了每个线程各扫自家门前雪,动作快的完成后眼见别的线程再累都是爱莫能助。
实现及演示代码如下:由三个类实现,写在了一个 java 文件中:TaskDistributor 为任务分发器,Task 为待执行的任务,WorkThread 为自定的工作线程。代码中运用了命令模式,如若能配以监听器,用上观察者模式来控制 UI 显示就更绝妙不过了,就能实现像下载中的区块着色跳跃的动感了,在此定义下一步的着眼点了。
代码中有较为详细的注释,看这些注释和执行结果就很容易理解的。main() 是测试方法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 |
package com.unmi.common; import java.util.ArrayList; import java.util.List; /** * 指派任务列表给线程的分发器 * @author Unmi * QQ: 1125535 Email: fantasia@sina.com * MSN: kypfos@msn.com 2008-03-25 */ public class TaskDistributor { /** * 测试方法 * @param args */ public static void main(String[] args) { //初始化要执行的任务列表 List<Task> taskList = new ArrayList<Task>(); for (int i = 0; i < 108; i++) { taskList.add(new Task(i)); } //设定要启动的工作线程数为 5 个 int threadCount = 5; List<Task>[] taskListPerThread = distributeTasks(taskList, threadCount); System.out.println("实际要启动的工作线程数:"+taskListPerThread.length); for (int i = 0; i < taskListPerThread.length; i++) { Thread workThread = new WorkThread(taskListPerThread[i],i); workThread.start(); } } /** * 把 List 中的任务分配给每个线程,先平均分配,剩于的依次附加给前面的线程 * 返回的数组有多少个元素 (List<Task>) 就表明将启动多少个工作线程 * @param taskList 待分派的任务列表 * @param threadCount 线程数 * @return 列表的数组,每个数组中存有该线程要执行的任务列表 */ public static List<Task>[] distributeTasks(List<Task> taskList, int threadCount) { // 每个线程至少要执行的任务数,假如不为零则表示每个线程都会分配到任务 int minTaskCount = taskList.size() / threadCount; // 平均分配后还剩下的任务数,不为零则还有任务依个附加到前面的线程中 int remainTaskCount = taskList.size() % threadCount; // 实际要启动的线程数,如果工作线程比任务还多 // 自然只需要启动与任务相同个数的工作线程,一对一的执行 // 毕竟不打算实现了线程池,所以用不着预先初始化好休眠的线程 int actualThreadCount = minTaskCount > 0 ? threadCount : remainTaskCount; // 要启动的线程数组,以及每个线程要执行的任务列表 List<Task>[] taskListPerThread = new List[actualThreadCount]; int taskIndex = 0; //平均分配后多余任务,每附加给一个线程后的剩余数,重新声明与 remainTaskCount //相同的变量,不然会在执行中改变 remainTaskCount 原有值,产生麻烦 int remainIndces = remainTaskCount; for (int i = 0; i < taskListPerThread.length; i++) { taskListPerThread[i] = new ArrayList<Task>(); // 如果大于零,线程要分配到基本的任务 if (minTaskCount > 0) { for (int j = taskIndex; j < minTaskCount + taskIndex; j++) { taskListPerThread[i].add(taskList.get(j)); } taskIndex += minTaskCount; } // 假如还有剩下的,则补一个到这个线程中 if (remainIndces > 0) { taskListPerThread[i].add(taskList.get(taskIndex++)); remainIndces--; } } // 打印任务的分配情况 for (int i = 0; i < taskListPerThread.length; i++) { System.out.println("线程 " + i + " 的任务数:" + taskListPerThread[i].size() + " 区间[" + taskListPerThread[i].get(0).getTaskId() + "," + taskListPerThread[i].get(taskListPerThread[i].size() - 1).getTaskId() + "]"); } return taskListPerThread; } } /** * 要执行的任务,可在执行时改变它的某个状态或调用它的某个操作 * 例如任务有三个状态,就绪,运行,完成,默认为就绪态 * 要进一步完善,可为 Task 加上状态变迁的监听器,因之决定UI的显示 */ class Task { public static final int READY = 0; public static final int RUNNING = 1; public static final int FINISHED = 2; private int status; //声明一个任务的自有业务含义的变量,用于标识任务 private int taskId; //任务的初始化方法 public Task(int taskId){ this.status = READY; this.taskId = taskId; } /** * 执行任务 */ public void execute() { // 设置状态为运行中 setStatus(Task.RUNNING); System.out.println("当前线程 ID 是:" + Thread.currentThread().getName() +" | 任务 ID 是:"+this.taskId); // 附加一个延时 try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } // 执行完成,改状态为完成 setStatus(FINISHED); } public void setStatus(int status) { this.status = status; } public int getTaskId() { return taskId; } } /** * 自定义的工作线程,持有分派给它执行的任务列表 */ class WorkThread extends Thread { //本线程待执行的任务列表,你也可以指为任务索引的起始值 private List<Task> taskList = null; private int threadId; /** * 构造工作线程,为其指派任务列表,及命名线程 ID * @param taskList 欲执行的任务列表 * @param threadId 线程 ID */ public WorkThread(List<Task> taskList,int threadId) { this.taskList = taskList; this.threadId = threadId; } /** * 执行被指派的所有任务 */ public void run() { for (Task task : taskList) { task.execute(); } } } |
执行结果如下,注意观察每个线程分配到的任务数量及区间。直到所有的线程完成了所分配到的任务后程序结束:
线程 0 的任务数:22 区间[0,21]
线程 1 的任务数:22 区间[22,43]
线程 2 的任务数:22 区间[44,65]
线程 3 的任务数:21 区间[66,86]
线程 4 的任务数:21 区间[87,107]
实际要启动的工作线程数:5
当前线程 ID 是:Thread-0 | 任务 ID 是:0
当前线程 ID 是:Thread-1 | 任务 ID 是:22
当前线程 ID 是:Thread-2 | 任务 ID 是:44
当前线程 ID 是:Thread-3 | 任务 ID 是:66
当前线程 ID 是:Thread-4 | 任务 ID 是:87
当前线程 ID 是:Thread-0 | 任务 ID 是:1
当前线程 ID 是:Thread-1 | 任务 ID 是:23
当前线程 ID 是:Thread-2 | 任务 ID 是:45
...........................................................................
上面坦白来只算是基本功夫,贴出来还真见笑了。还有更为复杂的功能:
像多线程的下载工具的确更充分利用了网络资源,而且像 FlashGet、NetAnts 都实现了:假如某个线程下载完了欲先所分配段的内容之后,会帮其他线程下载未完成数据,直到任务完成;或某一下载线程的未完成段区间已经很小了,用不着别人来帮忙时,这就涉及到任务的进一步分配。再如,以上两个工具都能动态增加、减小或中止线程,越说越复杂了,它们原本比这复杂多了,这些实现可能定义各种队列来实现,如未完成任务队列、下载中任务队列和已完成队列。难以细究了。
本文链接 https://yanbin.blog/task-dispatch-strategy-demo/, 来自 隔叶黄莺 Yanbin Blog
[版权声明] 本文采用 署名-非商业性使用-相同方式共享 4.0 国际 (CC BY-NC-SA 4.0) 进行许可。
为何要自己写呢?这不就是线程池,jdk5有ThreadExecutor
@dennis
确如楼上的所言,之前对 Tiger 的 concurrent 略有了解,未作深究。当时只是想不需要用预先初始化好一个连接池,让 任务找线程,只是要 线程找任务。现在才发现,自己是太多虑了。
用 Executor 都不用自己去分配任务了,并且效率更高(粒度更细),不至于某个线程太慢而大大影响了整个进度,应该还能更好的避免线程死锁的情况。
用它的子接口 ExecutorService 可方便监测线程执行状态,从线程获取返回值。谢谢楼上的提醒!
重新发明轮子,也一定发明的好,
用jdk1.5就用concurrent包,用jdk1.4也有concurrent的backport,何必呢。。