[java] 자바로 카프카와 함께하는 실시간 주식 시세 처리

이번에는 자바 프로그래밍 언어를 사용하여 카프카로 실시간 주식 시세를 처리하는 방법에 대해 알아보겠습니다. 카프카는 대용량의 데이터를 실시간으로 처리하기 위한 분산 메시징 시스템으로 많은 기업에서 사용하고 있습니다.

1. 카프카 설정

먼저, 카프카 클러스터를 설정해야 합니다. 카프카 클러스터를 구성하고 토픽을 생성하는 방법은 공식 문서를 참고하시면 됩니다.

2. 주식 데이터 수집

주식 데이터를 실시간으로 수집하기 위해 외부 API를 사용할 수 있습니다. 예를 들어 Alpha Vantage API를 사용하여 실시간 주식 시세를 얻을 수 있습니다. 이 API는 RESTful 웹 서비스로 주식 데이터를 제공합니다.

먼저, Maven을 사용하여 HttpClient를 추가해 줍니다.

<dependency>
    <groupId>org.apache.httpcomponents</groupId>
    <artifactId>httpclient</artifactId>
    <version>4.5.13</version>
</dependency>

다음으로, Alpha Vantage API로부터 주식 데이터를 조회하는 코드를 작성합니다.

import org.apache.http.HttpResponse;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.util.EntityUtils;

import java.io.IOException;

public class StockDataCollector {

    private static final String API_KEY = "YOUR_API_KEY";
    private static final String SYMBOL = "AAPL";
    private static final String FUNCTION = "TIME_SERIES_INTRADAY";
    private static final String INTERVAL = "1min";
    private static final String OUTPUT_SIZE = "compact";

    public static void main(String[] args) {
        CloseableHttpClient httpClient = HttpClients.createDefault();
        HttpGet httpGet = new HttpGet(getApiUrl());

        try {
            HttpResponse response = httpClient.execute(httpGet);
            String jsonString = EntityUtils.toString(response.getEntity());

            // 주식 데이터 처리 로직 작성
        } catch (IOException e) {
            e.printStackTrace();
        } finally {
            try {
                httpClient.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }

    private static String getApiUrl() {
        return "https://www.alphavantage.co/query?function=" + FUNCTION +
                "&symbol=" + SYMBOL +
                "&interval=" + INTERVAL +
                "&outputsize=" + OUTPUT_SIZE +
                "&apikey=" + API_KEY;
    }
}

위 코드에서 API_KEY는 Alpha Vantage API에서 발급받은 키를 입력해야 합니다. SYMBOL은 조회할 주식의 심볼을 입력하고, FUNCTION은 데이터 조회 함수, INTERVAL은 데이터 주기, OUTPUT_SIZE는 데이터 크기를 의미합니다.

3. 데이터 처리 및 전송

주식 데이터를 수신한 후, 데이터를 처리하고 카프카로 전송하는 코드를 작성해야 합니다. 카프카 클라이언트를 사용하여 데이터를 전송할 수 있습니다.

먼저, Maven을 사용하여 카프카 클라이언트를 추가해 줍니다.

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>2.7.0</version>
</dependency>

다음으로, 카프카로 데이터를 전송하는 코드를 작성합니다.

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;

import java.util.Properties;

public class StockDataProcessor {

    private static final String BOOTSTRAP_SERVERS = "localhost:9092";
    private static final String TOPIC = "stock-data";

    public static void main(String[] args) {
        Properties properties = new Properties();
        properties.put("bootstrap.servers", BOOTSTRAP_SERVERS);
        properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        Producer<String, String> producer = new KafkaProducer<>(properties);

        // 주식 데이터를 Kafka로 전송하는 로직 작성

        producer.close();
    }

    private static void sendStockData(Producer<String, String> producer, String stockData) {
        ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC, stockData);
        producer.send(record);
    }
}

위 코드에서 BOOTSTRAP_SERVERS는 카프카 브로커의 주소를 입력합니다. TOPIC은 카프카 토픽의 이름을 의미합니다.

주식 데이터를 처리하고 Kafka로 전송하는 로직은 여기에 추가해 주시면 됩니다. 위의 예시 코드는 단순히 KafkaProducer를 사용하여 주식 데이터를 전송하는 예시입니다.

마무리

이렇게 자바로 카프카와 함께 실시간 주식 시세를 처리하는 방법을 알아보았습니다. 주식 데이터 수집 및 처리, 그리고 카프카로 데이터 전송에는 다양한 방법과 라이브러리를 사용할 수 있으니 자신에게 맞는 방법을 선택하여 프로젝트에 적용해 보세요.

참고 자료