[java] Java와 Apache Storm을 활용한 대용량 데이터 처리

Apache Storm은 대용량 실시간 스트림 데이터 처리를 위한 분산 컴퓨팅 프레임워크입니다. Java로 Storm을 개발하여 대용량 데이터를 처리하는 방법에 대해 알아보겠습니다.

필요한 도구 설치

먼저, Storm 개발을 위해 필요한 도구들을 설치해야 합니다.

1. JDK 설치

Storm은 Java로 개발되었기 때문에 JDK(Java Development Kit) 설치가 필요합니다.

2. Apache Storm 설치

Apache Storm은 공식 웹사이트에서 다운로드하고 설치할 수 있습니다. 압축 파일을 다운로드한 후에 적절한 디렉토리에 압축을 풀어주면 됩니다.

Topology 개발

Storm에서 Topology는 데이터 처리의 전체 구조를 정의하는 개념입니다. Topology는 Spout와 Bolt로 구성되며, Spout는 데이터를 생성하여 Bolt로 전달하고, Bolt는 데이터를 처리하거나 다른 Bolt로 전달하는 역할을 합니다.

import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.tuple.Fields;

public class WordCountTopology {
    public static void main(String[] args) {
        // TopologyBuilder 생성
        TopologyBuilder builder = new TopologyBuilder();
        
        // Spout 정의
        builder.setSpout("spout", new WordGeneratorSpout());
        
        // Bolt 정의
        builder.setBolt("split", new SplitBolt()).shuffleGrouping("spout");
        builder.setBolt("count", new CountBolt()).fieldsGrouping("split", new Fields("word"));
        
        // Config 생성
        Config config = new Config();
        config.setDebug(true);
        
        // LocalCluster 실행
        LocalCluster cluster = new LocalCluster();
        cluster.submitTopology("word-count-topology", config, builder.createTopology());
    }
}

위의 코드는 WordCountTopology를 정의하는 예제입니다. TopologyBuilder를 사용하여 Spout과 Bolt를 설정하고, Config를 설정한 후 LocalCluster를 생성하여 Topology를 실행합니다.

Spout 개발

Spout는 데이터를 생성하여 Topology 내부로 전달하는 역할을 합니다. 실제 데이터 소스와 연결되어 데이터를 읽고, Bolt로 전달합니다.

import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichSpout;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Values;

import java.util.Map;

public class WordGeneratorSpout extends BaseRichSpout {
    private SpoutOutputCollector collector;

    @Override
    public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
        this.collector = collector;
    }

    @Override
    public void nextTuple() {
        // 데이터 생성 및 전달
        String sentence = "Apache Storm is a distributed stream processing computation framework.";
        String[] words = sentence.split(" ");
        for (String word : words) {
            collector.emit(new Values(word));
        }
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("word"));
    }
}

위의 코드는 WordGeneratorSpout를 정의한 예제입니다. BaseRichSpout 클래스를 상속받아 원하는 데이터를 생성하여 Bolt로 전달할 수 있습니다.

Bolt 개발

Bolt는 Spout로부터 전달받은 데이터를 처리하는 역할을 합니다.

import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.BasicOutputCollector;
import org.apache.storm.topology.base.BaseBasicBolt;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.tuple.Tuple;

import java.util.HashMap;
import java.util.Map;

public class CountBolt extends BaseBasicBolt {
    private Map<String, Integer> wordCountMap;

    @Override
    public void prepare(Map stormConf, TopologyContext context) {
        wordCountMap = new HashMap<>();
    }

    @Override
    public void execute(Tuple tuple, BasicOutputCollector collector) {
        String word = tuple.getString(0);
        int count = wordCountMap.getOrDefault(word, 0) + 1;
        wordCountMap.put(word, count);

        System.out.println("Word: " + word + ", Count: " + count);
    }
    
    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        // OutputFieldsDeclarer를 통해 별도의 출력 필드를 선언하지 않음
    }
}

위의 코드는 CountBolt를 정의한 예제입니다. BaseBasicBolt 클래스를 상속받아 Spout로부터 전달받은 데이터를 카운트하여 출력하는 기능을 구현하였습니다.

Topology 실행

이제 Topology를 실행해보겠습니다.

import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.tuple.Fields;

public class WordCountTopology {
    public static void main(String[] args) {
        // TopologyBuilder 생성
        TopologyBuilder builder = new TopologyBuilder();
        
        // Spout 정의
        builder.setSpout("spout", new WordGeneratorSpout());
        
        // Bolt 정의
        builder.setBolt("split", new SplitBolt()).shuffleGrouping("spout");
        builder.setBolt("count", new CountBolt()).fieldsGrouping("split", new Fields("word"));
        
        // Config 생성
        Config config = new Config();
        config.setDebug(true);
        
        // LocalCluster 실행
        LocalCluster cluster = new LocalCluster();
        cluster.submitTopology("word-count-topology", config, builder.createTopology());
    }
}

위 코드를 실행하면 Topology가 실행되며, Spout가 데이터를 생성하고 Bolt가 데이터를 처리하는 과정이 진행됩니다.

결론

Java와 Apache Storm을 활용하여 대용량 데이터를 처리하는 방법에 대해 알아보았습니다. Storm을 이용하면 대용량 실시간 데이터를 효율적으로 처리할 수 있으며, 위의 예제를 참고하여 개발과 실행을 진행해 볼 수 있습니다.