[java] Java를 사용하여 Apache Storm의 실시간 마이크로 배치 처리하기

Apache Storm은 대용량 데이터를 실시간으로 처리할 수 있는 오픈 소스 분산 실시간 컴퓨팅 프레임워크입니다. Storm은 기본적으로 Java로 작성되어 있으며, 이를 활용하여 실시간 마이크로 배치 처리를 구현할 수 있습니다.

1. Apache Storm 설치

먼저 Apache Storm을 설치해야 합니다. 아래 링크에서 Apache Storm을 다운로드할 수 있습니다.

Apache Storm 다운로드 페이지

Apache Storm을 다운로드한 후 압축을 해제하고, 환경 변수를 설정해야 합니다. 자세한 설치 방법은 Apache Storm 공식 문서를 참고해주세요.

2. Topology 작성

Topology는 Apache Storm에서 데이터 흐름을 정의하는 역할을 합니다. Topology는 여러 개의 Spout와 Bolt로 구성됩니다. Spout는 데이터의 소스를 나타내며, Bolt는 데이터를 가공하거나 처리하는 단계를 나타냅니다.

다음은 간단한 Topology 예제입니다.

import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.tuple.Fields;

public class MyTopology {
    public static void main(String[] args) throws Exception {
        // TopologyBuilder를 사용하여 Topology를 생성
        TopologyBuilder builder = new TopologyBuilder();

        // Spout 설정
        builder.setSpout("spout", new MySpout(), 1);

        // Bolt 설정
        builder.setBolt("bolt", new MyBolt(), 1)
               .fieldsGrouping("spout", new Fields("field"));

        // Config 생성
        Config config = new Config();
        config.setDebug(true);

        // LocalCluster를 사용하여 Topology 실행
        LocalCluster cluster = new LocalCluster();
        cluster.submitTopology("my-topology", config, builder.createTopology());

        // Topology 실행 후 특정 시간 동안 대기
        Thread.sleep(10000);

        // Topology 종료
        cluster.shutdown();
    }
}

위 예제에서는 Topology에 “spout”와 “bolt”라는 이름의 Spout와 Bolt를 설정하고, “spout”에서 나온 데이터를 “bolt”로 전달하도록 설정하였습니다. Config를 통해 디버그 모드를 활성화하고, LocalCluster를 사용하여 Topology를 실행하는 예제입니다.

3. Spout 구현

Spout는 데이터의 소스를 나타냅니다. 실제 데이터 소스인 데이터베이스, 파일, 외부 API 등과의 연동을 구현할 수 있습니다. 아래는 간단한 Spout 예제입니다.

import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichSpout;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Values;
import org.apache.storm.utils.Utils;

import java.util.Map;

public class MySpout extends BaseRichSpout {
    private SpoutOutputCollector collector;

    @Override
    public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
        this.collector = collector;
    }

    @Override
    public void nextTuple() {
        // 데이터 소스로부터 데이터를 가져와서 Bolt로 전송
        String data = getDataFromSource();

        // 데이터를 Tuple로 변환하여 Emit
        collector.emit(new Values(data));

        // 일정 시간 대기
        Utils.sleep(1000);
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("field"));
    }

    private String getDataFromSource() {
        // 데이터 소스로부터 데이터를 가져오는 로직
        return "Data from source";
    }
}

위의 예제에서는 Spout를 구현하기 위해 BaseRichSpout 클래스를 상속하였습니다. open 메서드에서 초기화 작업을 수행하고, nextTuple 메서드에서 데이터 소스로부터 데이터를 가져와 Bolt로 전송하도록 구현하였습니다. declareOutputFields 메서드는 Emit하는 Tuple의 스키마를 정의합니다.

4. Bolt 구현

Bolt는 데이터를 가공하거나 처리하는 단계를 나타냅니다. 아래는 간단한 Bolt 예제입니다.

import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.BasicOutputCollector;
import org.apache.storm.topology.base.BaseBasicBolt;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;

import java.util.Map;

public class MyBolt extends BaseBasicBolt {
    private OutputCollector collector;

    @Override
    public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
        this.collector = collector;
    }

    @Override
    public void execute(Tuple input, BasicOutputCollector collector) {
        // 입력으로 들어온 데이터를 가공 또는 처리하는 로직
        String data = input.getStringByField("field");
        String processedData = processData(data);

        // 가공한 데이터를 Emit
        collector.emit(new Values(processedData));
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("processedField"));
    }

    private String processData(String data) {
        // 데이터를 가공하는 로직
        return "Processed data: " + data;
    }
}

위의 예제에서는 Bolt를 구현하기 위해 BaseBasicBolt 클래스를 상속하였습니다. prepare 메서드에서 초기화 작업을 수행하고, execute 메서드에서 입력으로 들어온 데이터를 가공 또는 처리하고, 가공한 데이터를 Emit하도록 구현하였습니다. declareOutputFields 메서드는 Emit하는 Tuple의 스키마를 정의합니다.

5. Topology 실행 및 결과 확인

위에서 작성한 Topology를 실행하려면 JDK와 Apache Storm이 설치된 환경에서 다음 명령을 실행해야 합니다.

$ storm jar mytopology.jar MyTopology

위 명령을 실행하면 Topology가 실행되고, Spout에서 전달한 데이터가 Bolt로 전달되어 가공된 후 출력됩니다. Topology 실행 중에는 지정한 시간 동안 실행이 계속되며, 해당 시간이 지나면 Topology가 종료됩니다.

참고 자료