[java] Java를 사용하여 Apache Storm의 데이터 흐름 조작하기

Apache Storm은 대규모 실시간 데이터 처리를 위한 분산 컴퓨팅 프레임워크입니다. Java를 사용하여 Storm의 데이터 흐름을 조작하는 방법에 대해 알아보겠습니다.

1. Topology 생성

먼저, Storm의 Topology(토폴로지)란 데이터 처리 작업을 구성하는 연산자들의 그래프입니다. Topology는 backtype.storm.topology.TopologyBuilder를 사용하여 생성됩니다.

import backtype.storm.topology.TopologyBuilder;

...

TopologyBuilder builder = new TopologyBuilder();

2. Spout 추가

Spout는 데이터를 생성하고 Storm 토폴로지에 입력하는 역할을 합니다. backtype.storm.topology.TopologyBuildersetSpout 메서드를 사용하여 Spout를 추가할 수 있습니다.

import backtype.storm.topology.SpoutDeclarer;
import backtype.storm.tuple.Fields;

...

SpoutDeclarer spout = builder.setSpout("spout", new MySpout()); // MySpout는 사용자 정의 Spout 클래스입니다.

3. Bolt 추가

Bolt는 데이터를 가공하고 원하는 방식으로 처리하는 역할을 합니다. TopologyBuildersetBolt 메서드를 사용하여 Bolt를 추가할 수 있습니다.

import backtype.storm.topology.BoltDeclarer;

...

BoltDeclarer bolt = builder.setBolt("bolt", new MyBolt()) // MyBolt는 사용자 정의 Bolt 클래스입니다.
                          .fieldsGrouping("spout", new Fields("field1")); // Bolt의 입력 데이터 소스를 지정합니다.

4. Topology 실행

위에서 생성한 Topology를 실행하기 위해 backtype.storm.Config 객체를 생성하고 Topology를 제출합니다.

import backtype.storm.Config;
import backtype.storm.LocalCluster;

...

Config config = new Config();
LocalCluster cluster = new LocalCluster();

cluster.submitTopology("myTopology", config, builder.createTopology());

5. 데이터 흐름 확인

위의 단계들을 모두 거치면 Storm 클러스터에서 Topology가 실행되고 데이터가 흐르게 됩니다. 이를 확인하기 위해 backtype.storm.task.OutputCollector를 사용하여 결과를 출력하거나 다른 작업을 수행할 수 있습니다.

import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

...

public class MyBolt extends BaseRichBolt {

    private OutputCollector collector;
    private static final Logger LOGGER = LoggerFactory.getLogger(MyBolt.class);

    @Override
    public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
        this.collector = collector;
    }

    @Override
    public void execute(Tuple input) {
        // 데이터 처리 작업 수행
        String inputData = input.getStringByField("field1");
        LOGGER.info("Received input: {}", inputData);

        // 결과를 다음 Bolt로 전달
        collector.emit(new Values(inputData.toUpperCase()));
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("field2"));
    }
}

결론

이제 Java를 사용하여 Apache Storm의 데이터 흐름을 조작하는 방법을 알게 되었습니다. Topology를 만들고 Spout와 Bolt를 추가한 뒤에는 Storm 클러스터에서 데이터 처리 작업을 수행할 수 있게 됩니다.

참고 자료