[java] 아파치 플링크의 스트리밍 SQL(Streaming SQL in Apache Flink)

아파치 플링크는 대규모 데이터 처리를 위한 오픈 소스 스트리밍 처리 엔진입니다. 플링크는 다양한 데이터 처리 작업을 위한 다양한 API를 제공하며, 그 중 하나가 스트리밍 SQL입니다. 스트리밍 SQL을 사용하면 구조화된 데이터를 처리하기 위해 SQL 쿼리를 작성할 수 있습니다.

스트리밍 SQL 사용 예제

아래는 간단한 스트리밍 SQL 쿼리 예제입니다.

TableEnvironment tableEnv = StreamTableEnvironment.create(env);

String sourceDDL = "CREATE TABLE source_table (\n" +
        "  id INT,\n" +
        "  name STRING,\n" +
        "  salary DOUBLE,\n" +
        "  PRIMARY KEY (id) NOT ENFORCED\n" +
        ") WITH (\n" +
        "  'connector.type' = 'kafka',\n" +
        "  'connector.version' = 'universal',\n" +
        "  'connector.startup-mode' = 'earliest-offset',\n" +
        "  'connector.topic' = 'source_topic',\n" +
        "  'connector.properties.bootstrap.servers' = 'localhost:9092',\n" +
        "  'format.type' = 'json',\n" +
        "  'format.derive-schema' = 'true'\n" +
        ")";

String sinkDDL = "CREATE TABLE sink_table (\n" +
        "  id INT,\n" +
        "  name STRING,\n" +
        "  salary DOUBLE\n" +
        ") WITH (\n" +
        "  'connector.type' = 'jdbc',\n" +
        "  'connector.url' = 'jdbc:mysql://localhost:3306/mydb',\n" +
        "  'connector.table' = 'sink_table',\n" +
        "  'connector.username' = 'user',\n" +
        "  'connector.password' = 'password'\n" +
        ")";

String query = "INSERT INTO sink_table\n" +
        "SELECT id, name, salary\n" +
        "FROM source_table\n" +
        "WHERE salary > 50000";

tableEnv.sqlUpdate(sourceDDL);
tableEnv.sqlUpdate(sinkDDL);
tableEnv.sqlUpdate(query);
tableEnv.execute("Streaming SQL Job");

위 예제에서는 source_table에서 구조화된 데이터를 읽어와서 salary가 50000보다 큰 레코드를 sink_table로 출력하는 스트리밍 SQL 쿼리를 정의합니다. source_table은 Kafka 커넥터를 사용하여 데이터를 읽고, sink_table은 JDBC 커넥터를 사용하여 데이터를 저장합니다.

스트리밍 SQL 설정

위 예제에서 사용되는 source_tablesink_table은 각각 Kafka와 JDBC 커넥터를 사용하여 데이터 처리를 수행합니다. 이를 위해 플링크는 테이블 설정을 제공하는 WITH 절을 사용합니다.

connector.type을 사용하여 데이터 소스 및 싱크의 유형을 지정할 수 있습니다. 예제에서는 kafkajdbc를 사용했습니다.

다른 옵션들은 데이터 소스 또는 싱크에 따라 다를 수 있으며, 예제에서는 각 커넥터의 호스트, 포트, 토픽, 데이터 형식 등을 설정합니다.

결론

아파치 플링크의 스트리밍 SQL을 사용하면 SQL 기반으로 구조화된 스트리밍 데이터를 처리할 수 있습니다. 이를 통해 개발자는 기존의 SQL 쿼리 작성 경험을 활용하여 대규모 실시간 데이터 처리 작업을 수행할 수 있습니다.

더 많은 정보를 원하신다면 아파치 플링크 공식 문서를 참조해주세요.