Java基础回顾(四)

36)线程状态:

  • NEW 刚创建 (new)
  • RUNNABLE 就绪 (start)
  • RUNNING 运行中 (run)
  • BLOCK 阻塞 (sleep)
  • TERMINATED 结束

线程被动地暂停和停止:不能及时释放资源。

线程主动暂停和停止:

  • 定期检测共享变量
  • 如果需要暂停和停止,先释放变量,再主动动作
  • 暂停:Thread.sleep(),休眠
  • 终止:run方法结束,线程终止

在线程中监控volatile变量,在发现变量改变时可以做出对应的变化,释放相应的资源。

37)普通线程:run方法运行结束时结束。

守护线程:run方法运行结束或main函数结束时结束。{.setDaemon(true);}

38)并行编程的困难:任务分配和执行过程的高度耦合。

并行模式:

  • 主从模式(Master-slave)
  • Worker模式(Worker-Worker)

Java并发编程:

  • Thread/Runnable/Thread组管理
  • Executor
  • Fork-Join框架

线程组Thread Group

  • 线程的集合
  • 树形结构,大线程组可以包括小线程组
  • 可以通过enumerate方法遍历组内的线程,执行操作
  • 能够有效管理多个线程,但是管理效率低
  • 任务分配和执行过程高度耦合
  • 重复创建线程、关闭线程操作,无法重用线程

Executor

  • 分离任务的创建和执行者的创建
  • 线程重复利用(new线程的代价过大)

共享线程池的理念

  • 预设好的多个Thread,可弹性增加
  • 多次执行很多很小的任务
  • 任务创建和执行过程解耦
  • 程序员无需关心线程池执行任务过程

Executor的主要类:ExecutorService、ThreadPoolExecutor、Future

  • Executor.newCachedThreadPool/newFixedThreadPool创建线程池、
  • ExecutorService线程池服务
  • Callable具体的逻辑对象(线程类){Callable和Runnable是等价的,而且Callable的call方法可以有返回值}
  • Future返回结果
public class Server {
	
	//线程池
	private ThreadPoolExecutor executor;
	
	public Server(){
		executor=(ThreadPoolExecutor)Executors.newCachedThreadPool();
		//executor=(ThreadPoolExecutor)Executors.newFixedThreadPool(5);
	}
	
	//向线程池提交任务
	public void submitTask(Task task){
		System.out.printf("Server: A new task has arrived\n");
		executor.execute(task); //执行  无返回值
		
		System.out.printf("Server: Pool Size: %d\n",executor.getPoolSize());
		System.out.printf("Server: Active Count: %d\n",executor.getActiveCount());
		System.out.printf("Server: Completed Tasks: %d\n",executor.getCompletedTaskCount());
	}

	public void endServer() {
		executor.shutdown();
	}
}
public class Main {

	public static void main(String[] args) throws InterruptedException {
		// 创建一个执行服务器
		Server server=new Server();

		// 创建100个任务,并发给执行器,等待完成
		for (int i=0; i<100; i++){
			Task task=new Task("Task "+i);
			Thread.sleep(10);
			server.submitTask(task);//不关心线程池如何去完成100个任务
		}		
		server.endServer();
	}
}
import java.util.Date;
import java.util.concurrent.TimeUnit;
/**
 * Task 任务类
 * @author Tom
 *
 */
public class Task implements Runnable {

	private String name;
	
	public Task(String name){
		this.name=name;
	}
	
	public void run() {
		try {
			Long duration=(long)(Math.random()*1000);
			System.out.printf("%s: Task %s: Doing a task during %d seconds\n",Thread.currentThread().getName(),name,duration);
			Thread.sleep(duration);			
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
		
		System.out.printf("%s: Task %s: Finished on: %s\n",Thread.currentThread().getName(),name,new Date());
	}

}

Fork-Join(分治编程)

适用于整体计算量不太确定的编程。(最小任务量可确定)(类似归并排序思想)

Fork-Join的关键类

  • ForkJoinPool任务池
  • RecursiveAction(用于没有返回结果的任务)
  • RecursiveTask(用于有返回值的任务)
import java.math.BigInteger;
import java.util.concurrent.RecursiveTask;

//分任务求和
public class SumTask extends RecursiveTask<Long> {
	
	private int start;
	private int end;

	public SumTask(int start, int end) {
		this.start = start;
		this.end = end;
	}

	public static final int threadhold = 5;

	@Override
	protected Long compute() {
		Long sum = 0L;
		
		// 如果任务足够小, 就直接执行
		boolean canCompute = (end - start) <= threadhold;
		if (canCompute) {
			for (int i = start; i <= end; i++) {
				sum = sum + i;				
			}
		} else {
			// 任务大于阈值, 分裂为2个任务
			int middle = (start + end) / 2;
			SumTask subTask1 = new SumTask(start, middle);
			SumTask subTask2 = new SumTask(middle + 1, end);

			invokeAll(subTask1, subTask2);

			Long sum1 = subTask1.join(); //用join阻塞,等待返回结果
			Long sum2 = subTask2.join();

			// 结果合并
			sum = sum1 + sum2;
		}
		return sum;
	}
}
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinTask;

//分任务求和
public class SumTest {
    
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        //创建执行线程池
    	ForkJoinPool pool = new ForkJoinPool();
    	//ForkJoinPool pool = new ForkJoinPool(4);
    	
    	//创建任务
        SumTask task = new SumTask(1, 10000000);
        
        //提交任务
        ForkJoinTask<Long> result = pool.submit(task);
        
        //等待结果
        do {
			System.out.printf("Main: Thread Count: %d\n",pool.getActiveThreadCount());
			System.out.printf("Main: Paralelism: %d\n",pool.getParallelism());
			try {
				Thread.sleep(50);
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
		} while (!task.isDone());
        
        //输出结果
        System.out.println(result.get().toString());
    }
}