Apache Storm은 대규모 실시간 데이터 처리를 위한 분산 컴퓨팅 프레임워크입니다. Java를 사용하여 Storm의 데이터 흐름을 조작하는 방법에 대해 알아보겠습니다.
1. Topology 생성
먼저, Storm의 Topology(토폴로지)란 데이터 처리 작업을 구성하는 연산자들의 그래프입니다. Topology는 backtype.storm.topology.TopologyBuilder
를 사용하여 생성됩니다.
import backtype.storm.topology.TopologyBuilder;
...
TopologyBuilder builder = new TopologyBuilder();
2. Spout 추가
Spout는 데이터를 생성하고 Storm 토폴로지에 입력하는 역할을 합니다. backtype.storm.topology.TopologyBuilder
의 setSpout
메서드를 사용하여 Spout를 추가할 수 있습니다.
import backtype.storm.topology.SpoutDeclarer;
import backtype.storm.tuple.Fields;
...
SpoutDeclarer spout = builder.setSpout("spout", new MySpout()); // MySpout는 사용자 정의 Spout 클래스입니다.
3. Bolt 추가
Bolt는 데이터를 가공하고 원하는 방식으로 처리하는 역할을 합니다. TopologyBuilder
의 setBolt
메서드를 사용하여 Bolt를 추가할 수 있습니다.
import backtype.storm.topology.BoltDeclarer;
...
BoltDeclarer bolt = builder.setBolt("bolt", new MyBolt()) // MyBolt는 사용자 정의 Bolt 클래스입니다.
.fieldsGrouping("spout", new Fields("field1")); // Bolt의 입력 데이터 소스를 지정합니다.
4. Topology 실행
위에서 생성한 Topology를 실행하기 위해 backtype.storm.Config
객체를 생성하고 Topology를 제출합니다.
import backtype.storm.Config;
import backtype.storm.LocalCluster;
...
Config config = new Config();
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("myTopology", config, builder.createTopology());
5. 데이터 흐름 확인
위의 단계들을 모두 거치면 Storm 클러스터에서 Topology가 실행되고 데이터가 흐르게 됩니다. 이를 확인하기 위해 backtype.storm.task.OutputCollector
를 사용하여 결과를 출력하거나 다른 작업을 수행할 수 있습니다.
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
...
public class MyBolt extends BaseRichBolt {
private OutputCollector collector;
private static final Logger LOGGER = LoggerFactory.getLogger(MyBolt.class);
@Override
public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
this.collector = collector;
}
@Override
public void execute(Tuple input) {
// 데이터 처리 작업 수행
String inputData = input.getStringByField("field1");
LOGGER.info("Received input: {}", inputData);
// 결과를 다음 Bolt로 전달
collector.emit(new Values(inputData.toUpperCase()));
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("field2"));
}
}
결론
이제 Java를 사용하여 Apache Storm의 데이터 흐름을 조작하는 방법을 알게 되었습니다. Topology를 만들고 Spout와 Bolt를 추가한 뒤에는 Storm 클러스터에서 데이터 처리 작업을 수행할 수 있게 됩니다.