某某茶叶有限公司欢迎您!
金沙棋牌在线 > 金沙棋牌在线 > Java线程池(一),java线程池

Java线程池(一),java线程池

时间:2019-12-29 06:38

熟悉java多线程的朋友一定十分了解java的线程池,jdk中的核心实现类为java.util.concurrent.ThreadPoolExecutor。大家可能了解到它的原理,甚至看过它的源码;但是就像我一样,大家可能对它的作用存在误解。现在问题来了,jdk为什么要提供java线程池?使用java线程池对于每次都创建一个新Thread有什么优势?

new Thread的弊端及Java四种线程池的使用:

Java 线程池,java线程池

系统启动一个线程的成本是比较高的,因为它涉及到与操作系统的交互,使用线程池的好处是提高性能,当系统中包含大量并发的线程时,会导致系统性能剧烈下降,甚至导致JVM崩溃,而线程池的最大线程数参数可以控制系统中并发线程数不超过次数。

一、Executors 工厂类用来产生线程池,该工厂类包含以下几个静态工厂方法来创建对应的线程池。创建的线程池是一个ExecutorService对象,使用该对象的submit方法或者是execute方法执行相应的Runnable或者是Callable任务。线程池本身在不再需要的时候调用shutdown()方法停止线程池,调用该方法后,该线程池将不再允许任务添加进来,但是会直到已添加的所有任务执行完成后才死亡。

1、newCachedThreadPool(),创建一个具有缓存功能的线程池,提交到该线程池的任务(Runnable或Callable对象)创建的线程,如果执行完成,会被缓存到CachedThreadPool中,供后面需要执行的任务使用。

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class CacheThreadPool {
    static class Task implements Runnable {
        @Override
        public void run() {
            System.out.println(this + " " + Thread.currentThread().getName() + " AllStackTraces map size: "
                    + Thread.currentThread().getAllStackTraces().size());
        }
    }

    public static void main(String[] args) {
        ExecutorService cacheThreadPool = Executors.newCachedThreadPool();

        //先添加三个任务到线程池
        for(int i = 0 ; i < 3; i++) {
            cacheThreadPool.execute(new Task());
        }

        //等三个线程执行完成后,再次添加三个任务到线程池
        try {
            Thread.sleep(3000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        for(int i = 0 ; i < 3; i++) {
            cacheThreadPool.execute(new Task());
        }
    }

}

执行结果如下:

[email protected] pool-1-thread-1 AllStackTraces map size: 7
[email protected] pool-1-thread-3 AllStackTraces map size: 7
[email protected] pool-1-thread-2 AllStackTraces map size: 7
[email protected] pool-1-thread-3 AllStackTraces map size: 7
[email protected] pool-1-thread-1 AllStackTraces map size: 7
[email protected] pool-1-thread-2 AllStackTraces map size: 7

线程池中的线程对象进行了缓存,当有新任务执行时进行了复用。但是如果有特别多的并发时,缓存线程池还是会创建很多个线程对象。

2、newFixedThreadPool(int nThreads) 创建一个指定线程个数,线程可复用的线程池。

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class FixedThreadPool {
    static class Task implements Runnable {
        @Override
        public void run() {
            System.out.println(this + " " + Thread.currentThread().getName() + " AllStackTraces map size: "
                    + Thread.currentThread().getAllStackTraces().size());
        }
    }

    public static void main(String[] args) {
        ExecutorService fixedThreadPool = Executors.newFixedThreadPool(3);

        // 先添加三个任务到线程池
        for (int i = 0; i < 5; i++) {
            fixedThreadPool.execute(new Task());
        }

        // 等三个线程执行完成后,再次添加三个任务到线程池
        try {
            Thread.sleep(3);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        for (int i = 0; i < 3; i++) {
            fixedThreadPool.execute(new Task());
        }
    }

}

执行结果:

[email protected] pool-1-thread-2 AllStackTraces map size: 7
[email protected] pool-1-thread-2 AllStackTraces map size: 7
[email protected] pool-1-thread-2 AllStackTraces map size: 7
[email protected] pool-1-thread-1 AllStackTraces map size: 7
[email protected] pool-1-thread-2 AllStackTraces map size: 7
[email protected] pool-1-thread-1 AllStackTraces map size: 7
[email protected] pool-1-thread-3 AllStackTraces map size: 7
[email protected] pool-1-thread-2 AllStackTraces map size: 7

3、newSingleThreadExecutor(),创建一个只有单线程的线程池,相当于调用newFixedThreadPool(1)

4、newSheduledThreadPool(int corePoolSize),创建指定线程数的线程池,它可以在指定延迟后执行线程。也可以以某一周期重复执行某一线程,知道调用shutdown()关闭线程池。

示例如下:

import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

public class ScheduledThreadPool {
    static class Task implements Runnable {
        @Override
        public void run() {
            System.out.println("time " + System.currentTimeMillis()  + " " + Thread.currentThread().getName() + " AllStackTraces map size: "
                    + Thread.currentThread().getAllStackTraces().size());
        }
    }

    public static void main(String[] args) {
        ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(3);

        scheduledExecutorService.schedule(new Task(), 3, TimeUnit.SECONDS);

        scheduledExecutorService.scheduleAtFixedRate(new Task(), 3, 5, TimeUnit.SECONDS);

        try {
            Thread.sleep(30 * 1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        scheduledExecutorService.shutdown();
    }

}

运行结果如下:

time 1458921795240 pool-1-thread-1 AllStackTraces map size: 6
time 1458921795241 pool-1-thread-2 AllStackTraces map size: 6
time 1458921800240 pool-1-thread-1 AllStackTraces map size: 7
time 1458921805240 pool-1-thread-1 AllStackTraces map size: 7
time 1458921810240 pool-1-thread-1 AllStackTraces map size: 7
time 1458921815240 pool-1-thread-1 AllStackTraces map size: 7
time 1458921820240 pool-1-thread-1 AllStackTraces map size: 7

由运行时间可看出,任务是按照5秒的周期执行的。

5、newSingleThreadScheduledExecutor() 创建一个只有一个线程的线程池,同调用newScheduledThreadPool(1)。

二、ForkJoinPool和ForkJoinTask

ForkJoinPool是ExecutorService的实现类,支持将一个任务划分为多个小任务并行计算,在把多个小任务的计算结果合并成总的计算结果。它有两个构造函数

ForkJoinPool(int parallelism)创建一个包含parallelism个并行线程的ForkJoinPool。

ForkJoinPool(),以Runtime.availableProcessors()方法返回值作为parallelism参数来创建ForkJoinPool。

ForkJoinTask 代表一个可以并行,合并的任务。它是实现了Future<T>接口的抽象类,它有两个抽象子类,代表无返回值任务的RecuriveAction和有返回值的RecursiveTask。可根据具体需求继承这两个抽象类实现自己的对象,然后调用ForkJoinPool的submit 方法执行。

RecuriveAction 示例如下,实现并行输出0-300的数字。

import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.RecursiveAction;
import java.util.concurrent.TimeUnit;

public class ActionForkJoinTask {
    static class PrintTask extends RecursiveAction {
        private static final int THRESHOLD = 50;
        private int start;
        private int end;

        public PrintTask(int start, int end) {
            this.start = start;
            this.end = end;
        }

        @Override
        protected void compute() {
            if (end - start < THRESHOLD) {
                for(int i = start; i < end; i++) {
                    System.out.println(Thread.currentThread().getName() + " " + i);
                }
            } else {
                int middle = (start + end) / 2;
                PrintTask left = new PrintTask(start, middle);
                PrintTask right = new PrintTask(middle, end);
                left.fork();
                right.fork();
            }
        }

    }

    public static void main(String[] args) {
        ForkJoinPool pool = new ForkJoinPool();

        pool.submit(new PrintTask(0,  300));
        try {
            pool.awaitTermination(2, TimeUnit.SECONDS);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        pool.shutdown();
    }

}

在拆分小任务后,调用任务的fork()方法,加入到ForkJoinPool中并行执行。

RecursiveTask示例,实现并行计算100个整数求和。拆分为每20个数求和后获取结果,在最后合并为最后的结果。

import java.util.Random;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.Future;
import java.util.concurrent.RecursiveTask;

public class TaskForkJoinTask {
    static class CalTask extends RecursiveTask<Integer> {
        private static final int THRESHOLD = 20;

        private int arr[];
        private int start;
        private int end;

        public CalTask(int[] arr, int start, int end) {
            this.arr = arr;
            this.start = start;
            this.end = end;
        }

        @Override
        protected Integer compute() {
            int sum = 0;

            if (end - start < THRESHOLD) {
                for (int i = start; i < end; i++) {
                    sum += arr[i];
                }
                System.out.println(Thread.currentThread().getName() + "  sum:" + sum);
                return sum;
            } else {
                int middle = (start + end) / 2;
                CalTask left = new CalTask(arr, start, middle);
                CalTask right = new CalTask(arr, middle, end);

                left.fork();
                right.fork();

                return left.join() + right.join();
            }
        }

    }

    public static void main(String[] args) {
        int arr[] = new int[100];
        Random random = new Random();
        int total = 0;

        for (int i = 0; i < arr.length; i++) {
            int tmp = random.nextInt(20);
            total += (arr[i] = tmp);
        }
        System.out.println("total " + total);

        ForkJoinPool pool = new ForkJoinPool(4);

        Future<Integer> future = pool.submit(new CalTask(arr, 0, arr.length));
        try {
            System.out.println("cal result: " + future.get());
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        }
        pool.shutdown();
    }

}

执行结果如下:

total 912
ForkJoinPool-1-worker-2  sum:82
ForkJoinPool-1-worker-2  sum:123
ForkJoinPool-1-worker-2  sum:144
ForkJoinPool-1-worker-3  sum:119
ForkJoinPool-1-worker-2  sum:106
ForkJoinPool-1-worker-2  sum:128
ForkJoinPool-1-worker-2  sum:121
ForkJoinPool-1-worker-3  sum:89
cal result: 912

子任务执行完后,调用任务的join()方法获取子任务执行结果,再相加获得最后的结果。

 

线程池,java线程池 系统启动一个线程的成本是比较高的,因为它涉及到与操作系统的交互,使用线程池的好处是提高性能,当系统中包...

Java线程池(一),java线程池

对线程池的误解

很长一段时间里我一直以为java线程池是为了提高多线程下创建线程的效率。创建好一些线程并缓存在线程池里,后面来了请求(Runnable)就从连接池中取出一个线程处理请求;这样就避免了每次创建一个新Thread对象。直到前段时间我看到一篇Neal Gafter(和Joshua Bloch合著了《Java Puzzlers》,现任职于微软,主要从事.NET语言方面的工作)的访谈,里面有这么一段谈话(http://www.infoq.com/cn/articles/neal-gafter-on-java):

图片 1

乍一看,大神的思路就是不一样:java线程池是为了防止java线程占用太多资源?

虽然是java大神的访谈,但是也不能什么都信,你说占资源就占资源?还是得写测试用例测一下。

首先验证下我的理解:

new Thread的弊端

new Thread(new Runnable() {
@Override
public void run() {
// TODO Auto-generated method stub
}
}).start();

为何使用线程池?

  • 第一:降低资源消耗。通过重复利用已创建的线程降低线程创建和销毁造成的消耗。

  • 第二:提高响应速度。当任务到达时,任务可以不需要等到线程创建就能立即执行。

  • 第三:提高线程的可管理性。线程是稀缺资源,如果无限制地创建,不仅会消耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一分配、调优和监控。

java线程池和创建java线程哪个效率高?

直接上测试用例:

public class ThreadPoolTest extends TestCase {
    private static final int COUNT = 10000;

    public void testThreadPool() throws InterruptedException {
        CountDownLatch countDownLatch = new CountDownLatch(COUNT);
        ExecutorService executorService = Executors.newFixedThreadPool(100);
        long bg = System.currentTimeMillis();
        for (int i = 0; i < COUNT; i++) {
        Runnable command = new TestRunnable(countDownLatch);
        executorService.execute(command);
        }
        countDownLatch.await();
        System.out.println("testThreadPool:" + (System.currentTimeMillis() - bg));
    }

    public void testNewThread() throws InterruptedException {
        CountDownLatch countDownLatch = new CountDownLatch(COUNT);
        long bg = System.currentTimeMillis();
        for (int i = 0; i < COUNT; i++) {
        Runnable command = new TestRunnable(countDownLatch);
        Thread thread = new Thread(command);
        thread.start();
        }
        countDownLatch.await();
        System.out.println("testNewThread:" + (System.currentTimeMillis() - bg));
    }

    private static class TestRunnable implements Runnable {
        private final CountDownLatch countDownLatch;

        TestRunnable(CountDownLatch countDownLatch) {
        this.countDownLatch = countDownLatch;
        }

        @Override
        public void run() {
        countDownLatch.countDown();
        }
    }
}

这里使用Executors.newFixedThreadPool(100)是为了控制线程池的核心连接数和最大连接数一样大,都为100。

我的机子上的测试结果:

testThreadPool:31
testNewThread:624

可以看到,使用线程池处理10000个请求的处理时间为31ms,而每次启用新线程的处理时间为624ms。

好了,使用线程池确实要比每次都创建新线程要快一些;但是testNewThread一共耗时624ms,算下平均每次请求的耗时为:

624ms/10000=62.4us

每次创建并启动线程的时间为62.4微秒。根据80/20原理,这点儿时间根本可以忽略不计。所以线程池并不是为了效率设计的。

new Thread的弊端如下:

  • a. 每次new Thread新建对象性能差。
  • b. 线程缺乏统一管理,可能无限制新建线程,相互之间竞争,及可能占用过多系统资源导致死机或oom。
  • c. 缺乏更多功能,如定时执行、定期执行、线程中断。

线程池如何处理任务?

先看线程池处理任务的机制图:

 

 

 

 

 

 

 

 

 

 

 

 

 

从上图中可以看出,当提交一个新任务到线程池时,线程池的处理流程如下:

  1. 线程池判断核心线程池里的线程是否都在执行任务。如果不是,则创建一个新的工作线程来执行任务。如果核心线程池里的线程都在执行任务,则进入下个流程。

  2. 线程池判断工作队列是否已经满。如果工作队列没有满,则将新提交的任务存储在这个工作队列里。如果工作队列满了,则进入下个流程。

  3. 线程池判断线程池的线程是否都处于工作状态。如果没有,则创建一个新的工作线程来执行任务。如果已经满了,则交给饱和策略来处理这个任务。

ThreadPoolExecutor执行execute方法分下面4种情况:

  1. 如果当前运行的线程少于corePoolSize,则创建新线程来执行任务(注意,执行这一步骤需要获取全局锁)。

  2. 如果运行的线程等于或多于corePoolSize,则将任务加入BlockingQueue。

  3. 如果无法将任务加入BlockingQueue(队列已满),则创建新的线程来处理任务(注意,执行这一步骤需要获取全局锁)。

  4. 如果创建新线程将使当前运行的线程超出maximumPoolSize,任务将被拒绝,并调用RejectedExecutionHandler.rejectedExecution()方法。

下面是ThreadPoolExecutor的执行流程图:

java线程池是为了节约资源?

再上测试用例:

public class ThreadPoolTest extends TestCase {
    public void testThread() throws InterruptedException {
        int i = 1;
        while (true) {
        Runnable command = new TestRunnable();
        Thread thread = new Thread(command);
        thread.start();
        System.out.println(i++);
        }
    }

    private static class TestRunnable implements Runnable {
        @Override
        public void run() {
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        }
    }
}

以上用例模拟每次请求都创建一个新线程处理请求,然后默认每个请求的处理时间为1000ms。而在我的机子上当请求数达到1096时会内存溢出:

java.lang.OutOfMemoryError: unable to create new native thread

为什么会抛OOM Error呢?因为jvm会为每个线程分配一定内存(JDK5.0以后每个线程堆栈大小为1M,以前每个线程堆栈大小为256K,也可以通过jvm参数-Xss来设置),所以当线程数达到一定数量时就报了该error。

设想如果不使用java线程池,而为每个请求都创建一个新线程来处理该请求,当请求量达到一定数量时一定会内存溢出的;而我们使用java线程池的话,线程数量一定会<=maximumPoolSize(线程池的最大线程数),所以设置合理的话就不会造成内存溢出。

现在问题明朗了:java线程池是为了防止内存溢出,而不是为了加快效率。

相比new Thread,Java提供的四种线程池的好处在于:

  • a. 重用存在的线程,减少对象创建、消亡的开销,性能佳。
  • b. 可有效控制最大并发线程数,提高系统资源的使用率,同时避免过多资源竞争,避免堵塞。
  • c. 提供定时执行、定期执行、单线程、并发数控制等功能。

 

浅谈java线程池

上文介绍了java线程池启动太多会造成OOM,使用java线程池也应该设置合理的线程数数量;否则应用可能十分不稳定。然而该如何设置这个数量呢?我们可以通过这个公式来计算:

(MaxProcessMemory – JVMMemory – ReservedOsMemory) / (ThreadStackSize) = Max number of threads

  • MaxProcessMemory     进程最大的内存
  • JVMMemory                 JVM内存
  • ReservedOsMemory     JVM的本地内存
  • ThreadStackSize            线程栈的大小

Java提供的四种线程池

  1. newCachedThreadPool创建一个可缓存线程池,如果线程池长度超过处理需要,可灵活回收空闲线程,若无可回收,则新建线程。
  2. newFixedThreadPool 创建一个定长线程池,可控制线程最大并发数,超出的线程会在队列中等待。
  3. newScheduledThreadPool 创建一个定长线程池,支持定时及周期性任务执行。
  4. newSingleThreadExecutor 创建一个单线程化的线程池,它只会用唯一的工作线程来执行任务,保证所有任务按照指定顺序(FIFO, LIFO, 优先级)执行。