[java] 자바를 이용한 아파치 하둡 파이프라인 개발 방법

아파치 하둡은 대용량의 데이터를 처리하고 분석하기 위한 분산 프레임워크입니다. 이 문서에서는 자바를 사용하여 아파치 하둡 파이프라인을 개발하는 방법에 대해 알아보겠습니다.

1. 아파치 하둡 설치하기

먼저, 아파치 하둡을 설치해야 합니다. 아파치 하둡 공식 웹사이트에서 최신 버전을 다운로드하고, 설치 가이드를 따라 설치합니다.

2. 자바 개발 환경 설정하기

아파치 하둡을 사용하여 파이프라인을 개발하기 위해서는 자바 개발 환경을 설정해야 합니다. 자바 JDK를 다운로드하고, 환경 변수를 설정합니다.

3. 아파치 하둡 파이프라인 개발하기

3.1. 데이터 입력 단계

아파치 하둡 파이프라인의 첫 번째 단계는 데이터를 입력하는 것입니다. 이를 위해 자바로 작성된 코드를 사용할 수 있습니다. 아래는 데이터를 입력하는 예제 코드입니다.

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;

public class DataInputJob {
    public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf, "Data Input Job");
        
        job.setJarByClass(DataInputJob.class);
        job.setInputFormatClass(TextInputFormat.class);
        TextInputFormat.addInputPath(job, new Path("input-data.txt"));
        
        job.setOutputFormatClass(TextOutputFormat.class);
        TextOutputFormat.setOutputPath(job, new Path("output-data"));
        
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);
        
        job.waitForCompletion(true);
    }
}

위 코드에서는 TextInputFormat을 사용하여 입력 데이터를 읽고, TextOutputFormat을 사용하여 결과를 출력합니다. input-data.txt라는 파일이 입력 데이터로 사용되며, 결과는 output-data 디렉토리에 저장됩니다.

3.2. 데이터 처리 단계

데이터 입력 단계 이후, 데이터를 처리해야 합니다. 아파치 하둡은 맵리듀스 모델을 사용하여 데이터를 처리합니다. 이를 위해 맵 함수와 리듀스 함수를 자바로 작성해야 합니다. 아래는 데이터를 처리하는 예제 코드입니다.

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;
import java.util.StringTokenizer;

public class DataProcessingJob {
    public static class DataMapper extends Mapper<Object, Text, Text, IntWritable> {
        private final static IntWritable ONE = new IntWritable(1);
        private Text word = new Text();
        
        public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
            StringTokenizer tokenizer = new StringTokenizer(value.toString());
            while (tokenizer.hasMoreTokens()) {
                word.set(tokenizer.nextToken());
                context.write(word, ONE);
            }
        }
    }
    
    public static class DataReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
        private IntWritable result = new IntWritable();
        
        public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
            int sum = 0;
            for (IntWritable value : values) {
                sum += value.get();
            }
            result.set(sum);
            context.write(key, result);
        }
    }
    
    public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf, "Data Processing Job");
        
        job.setJarByClass(DataProcessingJob.class);
        job.setMapperClass(DataMapper.class);
        job.setCombinerClass(DataReducer.class);
        job.setReducerClass(DataReducer.class);
        
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);
        
        FileInputFormat.addInputPath(job, new Path("output-data"));
        FileOutputFormat.setOutputPath(job, new Path("output-result"));
        
        job.waitForCompletion(true);
    }
}

위 코드에서는 DataMapper 클래스에서 입력 데이터를 맵 함수로 처리하고, DataReducer 클래스에서 중간 결과를 리듀스 함수로 처리합니다. 이후, 결과는 output-result 디렉토리에 저장됩니다.

4. 아파치 하둡 파이프라인 실행하기

파이프라인 개발이 완료되면, 아파치 하둡을 실행하여 파이프라인을 실행할 수 있습니다. 아래의 명령어를 사용하여 아파치 하둡 파이프라인을 실행합니다.

$ hadoop jar pipeline.jar DataInputJob
$ hadoop jar pipeline.jar DataProcessingJob

위 명령어에서 pipeline.jar는 개발한 파이프라인의 JAR 파일을 나타냅니다.

결론

이 문서에서는 자바를 사용하여 아파치 하둡 파이프라인을 개발하는 방법을 알아보았습니다. 아파치 하둡을 사용하여 데이터를 입력하고 처리하는 과정을 자바 코드로 작성하여 자신만의 데이터 분석 파이프라인을 개발할 수 있습니다.

참고 자료