Multi Thread

2019. 7. 30. 15:05Programming/Spring

반응형

AsyncConfig.java

package com.mobileleader.batch.async;

import java.util.concurrent.Executor;

import javax.annotation.Resource;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.aop.interceptor.AsyncUncaughtExceptionHandler;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.AsyncConfigurer;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

import com.mobileleader.batch.exception.AsyncExceptionHandler;
@Configuration
@EnableAsync
public class AsyncConfig implements AsyncConfigurer {
	private static Logger logger = LoggerFactory.getLogger(AsyncConfig.class);
	
    /** 샘플 기본 Thread 수 */
    private static int TASK_CORE_POOL_SIZE = 10;
    /** 샘플 최대 Thread 수 */
    private static int TASK_MAX_POOL_SIZE = 20;
    /** 샘플 QUEUE 수 */
    private static int TASK_QUEUE_CAPACITY = 200;
    /** 샘플 Thread Bean Name */
    private static String EXECUTOR_BEAN_NAME = "executor";
    /** 샘플 Thread */
    @Resource(name = "executorDownload")
    private ThreadPoolTaskExecutor executorDownload;
 
    /**
     * 샘플 Thread 생성
     *
     * @return
     */
    @Bean(name = "executorDownload")
    @Override
    public Executor getAsyncExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(TASK_CORE_POOL_SIZE);
        executor.setMaxPoolSize(TASK_MAX_POOL_SIZE);
        executor.setQueueCapacity(TASK_QUEUE_CAPACITY);
        executor.setBeanName(EXECUTOR_BEAN_NAME);
        executor.initialize();
        return executor;
    }
  
    /**
     * Thread 등록 가능 여부
     *
     * @param createCnt : 생성 개수
     * @return 실행중인 task 개수 + 실행할 개수가 최대 개수(max + queue)보다 크면 false
     */
    public boolean isTaskExecute(int createCnt) {
        int currentPoolSize = executorDownload.getThreadPoolExecutor().getQueue().size();
        return !((currentPoolSize + createCnt) > (TASK_QUEUE_CAPACITY));
    }


    /* (non-Javadoc)
     * @see org.springframework.scheduling.annotation.AsyncConfigurer#getAsyncUncaughtExceptionHandler()
     */
    @Override
    public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
        return new AsyncExceptionHandler();
    }    
}

 

AsyncTaskService.java

package com.mobileleader.batch.service;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;

@Service
public class AsyncTaskService {
	private static Logger logger = LoggerFactory.getLogger(AsyncTaskService.class);
	
	/**
     * 시뮬레이션 테스트용 함수
     *
     * @param str
     */
    @Async
    public void executorDownload(String str) {
        // LOG : 시작 입력
        // ...
        //logger.debug("==============>>>>>>>>>>>> THREAD START : {}", str);
        
        testMethod(str);
        
        // LOG : 종료 입력
        // ...
        logger.debug("==============>>>>>>>>>>>> THREAD END : {}", str);
    }
    
    public void testMethod(String str) {
    	//logger.debug("==============>>>>>>>>>>>> Test Method START : {}", str);
    	try {
			Thread.sleep(500);
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
    	//logger.debug("==============>>>>>>>>>>>> Test Method END : {}", str);
    }

}

 

AsyncExceptionHandler.java

package com.mobileleader.batch.exception;

import java.lang.reflect.Method;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.aop.interceptor.AsyncUncaughtExceptionHandler;

public class AsyncExceptionHandler implements AsyncUncaughtExceptionHandler {
	private static Logger logger = LoggerFactory.getLogger(AsyncExceptionHandler.class);

	@Override
	public void handleUncaughtException(Throwable throwable, Method method, Object... obj) {
		logger.error("==============>>>>>>>>>>>> THREAD ERROR");
        logger.error("Exception Message :: {}", throwable.getMessage());
        logger.error("Method Name :: " + method.getName());
        for (Object param : obj) {
            logger.error("Parameter Value :: {}", param);
        }
        
        // JOB_LOG : 종료 입력
        // ...
        logger.error("==============>>>>>>>>>>>> THREAD ERROR END");
		
	}

}

 

DownloadThread.java

package com.mobileleader.batch.thread;

import java.util.ArrayList;
import java.util.List;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.core.task.TaskRejectedException;
import org.springframework.stereotype.Service;

import com.mobileleader.batch.async.AsyncConfig;
import com.mobileleader.batch.service.AsyncTaskService;

@Service
public class DownloadThread extends BaseThread{

	private static Logger logger = LoggerFactory.getLogger(DownloadThread.class);
	
//	@Autowired
//	private EzsXifTblMapper ezsXifTblMapper;
	
	/** 샘플 스레드 */
    @Autowired
    private AsyncTaskService asyncTaskService;
 
    /** AsyncConfig */
    @Autowired
    private AsyncConfig asyncConfig;
	
	@Override
	public void run() {
		char c = 'A';
		while(true) {
			System.out.println("downloadThread run.");
			try {
	        	List<String> strList = new ArrayList<String>();
	            for(int i = 0 ; i < 200 ; i++) {
	            	strList.add(String.valueOf(c)+(i+1));
	            }
	            
	            // 등록 가능 여부 체크
	            if (asyncConfig.isTaskExecute(strList.size())) {
	                for (int i = 0; i < strList.size(); i++) {
	                    // task 사용
	                	asyncTaskService.executorDownload(strList.get(i));
	                }
	                c++;
	            } else {
	                logger.debug("==============>>>>>>>>>>> M > THREAD 개수 초과");
	            }
	        } catch (TaskRejectedException e) {
	            // TaskRejectedException : 개수 초과시 발생
	            logger.debug("==============>>>>>>>>>>>> THREAD ERROR");
	            logger.debug("TaskRejectedException : 등록 개수 초과");
	            logger.debug("==============>>>>>>>>>>>> THREAD END");
	        }
	        
	        logger.debug("ActiveCount : {}",asyncConfig.getActiveCount());	                
			
			// 너무 빨리 수행되서 로그 확인하기 위해 sleep
			try {
				Thread.sleep(1000);
			} catch (InterruptedException e) {
				e.printStackTrace();
			}			
		}
	}	
}
반응형