[java] Java를 사용하여 Apache Storm의 이벤트 드리븐 아키텍처 구현하기

Apache Storm은 대규모 실시간 데이터 처리를 위한 분산 스트리밍 플랫폼입니다. 이를 통해 데이터 소스로부터 발생하는 이벤트를 실시간으로 처리하고 결과를 분산 저장 및 분석할 수 있습니다.

아키텍처를 구축하기 위해 Java를 사용할 것이며, 이벤트 드리븐 아키텍처를 따르는 방식으로 구현할 것입니다. 이벤트 드리븐 아키텍처는 비동기적으로 발생하는 이벤트들을 통해 시스템의 동작을 결정하는 방식입니다.

1. Maven 프로젝트 생성

Java로 Storm 아키텍처를 구현하기 위해서는 Maven을 사용하여 프로젝트를 생성해야 합니다.

mvn archetype:generate -DgroupId=com.example -DartifactId=storm-example -DarchetypeArtifactId=maven-archetype-quickstart -DinteractiveMode=false

위 명령어를 사용하여 Maven 프로젝트 템플릿을 생성합니다. 생성된 프로젝트 디렉토리로 이동합니다.

2. 의존성 추가

Storm을 사용하기 위해 Maven 프로젝트의 pom.xml 파일에 다음 의존성을 추가합니다.

<dependency>
    <groupId>org.apache.storm</groupId>
    <artifactId>storm-core</artifactId>
    <version>2.2.0</version>
</dependency>

위 의존성은 Apache Storm의 핵심 모듈인 storm-core를 추가합니다.

3. Topology 작성

Topology는 Storm에서 데이터 처리를 위한 구성이나 로직을 정의하는 부분입니다. 프로젝트 디렉토리에 MainTopology.java 파일을 생성하고 다음과 같이 작성합니다.

import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.topology.TopologyBuilder;

public class MainTopology {
    public static void main(String[] args) {
        // Topology 생성
        TopologyBuilder builder = new TopologyBuilder();

        // Spout 추가
        builder.setSpout("spout", new EventSpout());

        // Bolt 추가
        builder.setBolt("bolt", new EventBolt())
                .shuffleGrouping("spout");

        // Config 생성
        Config config = new Config();
        config.setDebug(true);

        // LocalCluster 생성
        LocalCluster cluster = new LocalCluster();

        // Topology 실행
        cluster.submitTopology("event-driven-topology", config, builder.createTopology());

        // Topology 종료
        cluster.shutdown();
    }
}

위 코드에서는 TopologyBuilder를 사용하여 Topology를 생성하고, EventSpoutspout로 설정하여 데이터를 가져옵니다. 그리고 EventBoltbolt로 설정하여 데이터를 처리하도록 합니다.

4. Spout 작성

Spout는 데이터 소스로부터 실시간으로 데이터를 가져오는 역할을 담당합니다. EventSpout.java 파일을 생성하고 다음과 같이 작성합니다.

import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichSpout;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Values;

import java.util.Map;

public class EventSpout extends BaseRichSpout {
    private SpoutOutputCollector collector;

    @Override
    public void open(Map<String, Object> config, TopologyContext context,
                     SpoutOutputCollector collector) {
        this.collector = collector;
    }

    @Override
    public void nextTuple() {
        // 데이터 소스로부터 데이터를 가져옴
        String event = "Sample Event";

        // 가져온 데이터를 출력
        collector.emit(new Values(event));
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("event"));
    }
}

위 코드에서는 BaseRichSpout를 상속하여 EventSpout를 구현합니다. nextTuple 메소드에서 데이터 소스로부터 데이터를 가져와 collector를 통해 출력합니다.

5. Bolt 작성

Bolt는 실시간으로 처리된 데이터를 다양한 방식으로 가공 및 처리하는 역할을 담당합니다. EventBolt.java 파일을 생성하고 다음과 같이 작성합니다.

import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.tuple.Tuple;

import java.util.Map;

public class EventBolt extends BaseRichBolt {
    private OutputCollector collector;

    @Override
    public void prepare(Map<String, Object> config, TopologyContext context,
                        OutputCollector collector) {
        this.collector = collector;
    }

    @Override
    public void execute(Tuple tuple) {
        // 입력된 데이터를 처리하는 로직을 작성
        String event = tuple.getStringByField("event");

        // 처리된 결과를 출력
        System.out.println("Processed event: " + event);

        // 처리 결과를 다음 Bolt로 전달
        collector.ack(tuple);
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        // Bolt에서 아웃풋 필드를 따로 정의하지 않음
    }
}

위 코드에서는 BaseRichBolt를 상속하여 EventBolt를 구현합니다. execute 메소드에서 입력된 데이터를 처리하고, 결과를 출력하며 다음 Bolt로 전달합니다.

6. 실행

프로젝트 디렉토리로 이동한 후 다음 명령어를 사용하여 코드를 컴파일하고 실행합니다.

mvn clean package
storm jar target/storm-example-1.0-SNAPSHOT.jar com.example.MainTopology

위 명령어를 실행하면 Storm 클러스터가 로컬 모드로 실행되며, Topology가 실행됩니다. 이벤트 소스로부터 가져온 데이터가 체인지 터벤튼에서 처리되고, 처리된 결과가 출력됩니다.

결론

Java를 사용하여 Apache Storm의 이벤트 드리븐 아키텍처를 구현하는 방법에 대해 알아보았습니다. Storm을 사용하여 실시간으로 대용량 데이터를 처리하고 결과를 분석하는 기능을 구현하기 위해 위의 단계를 따라해보세요.

참고 자료