[java] Apache Storm의 Java 필터 기능 사용 방법

Apache Storm은 대규모 실시간 데이터 처리를 위한 분산 컴퓨팅 프레임워크입니다. Java 필터 기능을 사용하면 Storm 토폴로지 내에서 데이터를 필터링하고 처리할 수 있습니다. 이번 블로그 포스트에서는 Apache Storm의 Java 필터 기능을 사용하는 방법에 대해 알아보겠습니다.

필터 스파우트 생성하기

첫 번째 단계는 필터링할 데이터를 만들기 위해 필터 스파우트를 생성하는 것입니다. 아래는 필터 스파우트의 예시 코드입니다:

import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichSpout;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Values;
import org.apache.storm.utils.Utils;

import java.util.Map;
import java.util.Random;

public class FilterSpout extends BaseRichSpout {
    private SpoutOutputCollector collector;
    private Random random;

    @Override
    public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
        this.collector = collector;
        this.random = new Random();
    }

    @Override
    public void nextTuple() {
        Utils.sleep(100);
        String[] data = {"apple", "banana", "orange"};
        String randomData = data[random.nextInt(data.length)];
        collector.emit(new Values(randomData));
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("fruit"));
    }
}

위의 코드는 필터링할 데이터로 “apple”, “banana”, “orange” 중 하나를 랜덤하게 선정하여 스파우트에서 emit하는 예시입니다.

필터 볼트 생성하기

두 번째 단계는 필터링 로직을 포함한 필터 볼트를 생성하는 것입니다. 아래는 필터 볼트의 예시 코드입니다:

import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;

import java.util.Map;

public class FilterBolt extends BaseRichBolt {
    private OutputCollector collector;

    @Override
    public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
        this.collector = collector;
    }

    @Override
    public void execute(Tuple input) {
        String fruit = input.getStringByField("fruit");
        if (fruit.equals("apple")) {
            collector.emit(new Values(fruit));
        }
        // 필터링 조건을 추가로 설정할 수 있습니다.
        // 예를 들어, fruit.equals("banana") 인 경우에만 emit할 수도 있습니다.
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("filteredFruit"));
    }
}

위의 코드는 스파우트에서 전달받은 데이터의 필터링을 수행하는 볼트의 예시입니다. fruit 필드의 값이 “apple”인 경우에만 해당 데이터를 emit합니다. 필요에 따라 추가적인 필터링 조건을 설정할 수도 있습니다.

Topology 생성과 실행하기

마지막 단계는 필터 스파우트와 필터 볼트를 사용하여 Topology를 생성하고 실행하는 것입니다. 아래는 필터링을 위한 Topology의 예시 코드입니다:

import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.topology.TopologyBuilder;

public class FilterTopology {
    public static void main(String[] args) {
        TopologyBuilder builder = new TopologyBuilder();
        builder.setSpout("filterSpout", new FilterSpout());
        builder.setBolt("filterBolt", new FilterBolt()).shuffleGrouping("filterSpout");

        Config config = new Config();
        config.setDebug(true);

        LocalCluster cluster = new LocalCluster();
        cluster.submitTopology("filterTopology", config, builder.createTopology());

        Utils.sleep(5000);

        cluster.killTopology("filterTopology");
        cluster.shutdown();
    }
}

위의 코드에서는 FilterSpout와 FilterBolt를 사용하여 Topology를 생성하고, LocalCluster를 사용하여 Topology를 실행합니다. Topology는 “filterSpout” 스파우트에서 “filterBolt” 볼트로 shuffleGrouping을 사용하여 데이터를 전달합니다. 필터링 된 결과는 “filteredFruit” 필드에 저장됩니다.

이제 Apache Storm의 Java 필터 기능을 사용하는 방법에 대해 알아보았습니다. 필요에 따라 필터링 조건을 추가하거나 다른 기능을 구현할 수 있으니 참고하여 적용해 보세요.

참고 자료