[java] Apache Storm의 Java 카운팅 기능 사용 방법

Apache Storm은 대규모 분산 스트림 처리 시스템으로, 대량의 실시간 데이터를 처리할 수 있습니다. 이러한 데이터 처리에는 종종 특정 이벤트의 카운트가 필요합니다. 여기에서는 Apache Storm의 Java 카운팅 기능을 사용하는 방법을 알아보겠습니다.

1. Maven 의존성 추가

먼저 Maven 프로젝트를 생성하거나 기존 프로젝트에 아래의 의존성을 추가해야 합니다.

<dependency>
    <groupId>org.apache.storm</groupId>
    <artifactId>storm-core</artifactId>
    <version>2.2.0</version>
</dependency>

위의 의존성을 추가하면 Storm의 핵심 라이브러리를 프로젝트에 사용할 수 있습니다.

2. Topology 작성

다음으로, Storm Topology를 작성해야 합니다. Topology는 Storm의 작업 플로우를 정의하는 것으로, 데이터가 처리되는 방식을 결정합니다.

import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Values;
import org.apache.storm.utils.Utils;

public class CountingTopology {

    public static void main(String[] args) {
        // TopologyBuilder를 사용해 Topology를 생성합니다.
        TopologyBuilder builder = new TopologyBuilder();

        // Spout 생성
        builder.setSpout("wordSpout", new WordSpout(), 1);

        // Bolt 생성
        builder.setBolt("countBolt", new CountBolt(), 2)
                .fieldsGrouping("wordSpout", new Fields("word"));

        // Topology 실행을 위한 Configuration 설정
        Config config = new Config();
        config.setDebug(true);

        // LocalCluster를 사용해 Topology를 실행합니다.
        LocalCluster cluster = new LocalCluster();
        cluster.submitTopology("countingTopology", config, builder.createTopology());

        // Topology가 10초간 실행되도록 합니다.
        Utils.sleep(10000);

        // Topology 실행을 멈춥니다.
        cluster.killTopology("countingTopology");
        cluster.shutdown();
    }
}

위 예제에서 WordSpoutCountBolt는 사용자가 정의한 Spout와 Bolt입니다. Spout는 데이터를 생성하는 역할을하고, Bolt는 데이터를 처리하고 결과를 저장하는 역할을 합니다.

3. Spout 작성

데이터를 생성하는 Spout를 작성해야 합니다. 아래는 간단한 예시입니다.

import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichSpout;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Values;

import java.util.Map;

public class WordSpout extends BaseRichSpout {

    private SpoutOutputCollector collector;

    @Override
    public void open(Map<String, Object> config, TopologyContext context, SpoutOutputCollector collector) {
        this.collector = collector;
    }

    @Override
    public void nextTuple() {
        // 데이터 생성
        String[] words = { "apple", "banana", "orange", "apple", "pear", "banana" };

        // 생성한 데이터를 Tuple로 emit
        for (String word : words) {
            collector.emit(new Values(word));
        }
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("word"));
    }
}

위의 예제에서 nextTuple() 메서드에서는 원하는 데이터를 생성하고, collector.emit()을 사용하여 생성한 데이터를 Tuple로 emit합니다.

4. Bolt 작성

데이터를 처리하고 결과를 저장하는 Bolt를 작성해야 합니다. 아래는 간단한 예시입니다.

import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;

import java.util.HashMap;
import java.util.Map;

public class CountBolt extends BaseRichBolt {

    private OutputCollector collector;
    private Map<String, Integer> countMap;

    @Override
    public void prepare(Map<String, Object> config, TopologyContext context, OutputCollector collector) {
        this.collector = collector;
        this.countMap = new HashMap<>();
    }

    @Override
    public void execute(Tuple tuple) {
        // Tuple에서 데이터를 읽어옴
        String word = tuple.getStringByField("word");

        // 맵에서 해당 단어의 카운트 가져오기
        Integer count = countMap.getOrDefault(word, 0);

        // 카운트 증가
        count++;

        // 맵에 증가된 카운트 저장
        countMap.put(word, count);

        // 결과를 Tuple로 emit
        collector.emit(new Values(word, count));
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("word", "count"));
    }
}

위의 예제에서 execute() 메서드는 Tuple에서 데이터를 읽어와 해당 단어의 카운트를 증가시키고, 증가된 카운트를 결과로 emit합니다.

5. Topology 실행 및 결과 확인

위의 코드들을 작성한 후, CountingTopology 클래스를 실행하면 Topology가 실행되고 10초 동안 데이터가 처리됩니다. 처리된 결과는 CountBolt에서 출력됩니다.

이렇게 Apache Storm의 Java 카운팅 기능을 사용하여 대규모 분산 스트림 처리를 구현할 수 있습니다.

참고 자료