Coding/Server

Kafka를 이용해 로그 커스텀하여 저장하기(feat.ksqldb)

kangplay 2025. 9. 15. 13:06

1. 메시지 큐와 MOM

카프카를 이해하기 위해서는 메시지 큐와 MOM을 먼저 알아야한다. 메시지 큐는 분산화된 환경에서 발신자와 수신자 사이에서 메시지를 전송하고, 수신하는 기술을 의미한다. MOM(message oriented middleware)를 통해서 구현된다.

메시지 큐를 사용하면 발신자와 수신자가 서로를 직접 알 필요가 없으므로 느슨한 결합으로 만들어낼 수 있다. 즉, 발신자와 수신자가 서로에게 의존하지 않으므로, 독립적으로 확장될 수 있다.

또한, 수신자 서비스가 당장 장애 상황이더라도 모두 메시지 큐에 남아있으므로 결국 모든 메시지가 소비자 서비스에게 전달된다는 보장성을 갖는다. 

마지막으로 비동기 통신으로 구현하여 무거운 작업을 요청할 수 있다.

 

메시지 큐는 크게 Point to Point와 Pub/Sub 모델로 구분할 수 있다. Point to Point(이하 P2P) 모델은 메시지 전송 대상이 한 대로 고정되어있다. 반면, Pub/Sub 모델은 발신자가 토픽이라고 불리는 공간에 메시지를 전송하면, 그 토픽을 구독하고 있는 수신자 모두 메시지를 수신하는 방식이다.

*카프카는 Pub/Sub 모델만을 지원한다.

2. 카프카

카프카는 높은 확장성과 내결함성, 대용량 데이터 처리, 실시간 데이터 처리에 특화되어 있는 오픈소스 메시징 시스템이다. 카프카를 사용하여 데이터 흐름을 중앙화한다면, 복잡도가 낮아질 수 있다. 

카프카는 연속된 데이터 레코드들의 스트림을 토픽이라는 카테고리로 구분한다. 각 토픽은 다시 파티션으로 나누어, 단일 혹은 여러 서버에 분산 저장된다.

  • 브로커: 클러스터 안에 구성된 여러 서버 중 각 서버를 의미한다. 레코드 형태인 메시지 데이터의 저장과 검색 및 컨슈머에게 전달하고 관리한다.
  • 프로듀서: 토픽에 레코드를 전송 또는 생성하는 엔터티로, 데이터의 진입점이다.
  • 컨슈머: 토픽에서 레코드를 읽는 출구점으로, 하나 이상의 토픽을 구독하고, 브로커로부터 레코드를 소비한다.
  • 토픽: 프로듀서로부터 전송된 레코드 카테고리로 토픽 단위로 레코드를 분류하여 저장한다.

3. KSQL

카프카는 내부 토픽을 Sub하여 분석하거나, Sub한 토픽을 정제하여 다시 토픽으로 Pub할 수 있다. 이러한 파이프라인을 구축할 때, 다음과 같은 세 가지 방법을 고려할 수 있다.

  1. Kafka Client를 이용한 Consumer/Producer 직접 구현 배포
  2. Kafka Streams API 라이브러리를 이용한 어플리케이션 구현 배포
  3. KSQL문으로 로직을 구현하고 KSQL 서버에 배포-> 가장 추상화된 방식
🌟 Batch Processing vs Stream Processing
Ksql은 Streaming processing을 하기 위한 툴이다. 따라서 Batch Processing과 Stream Processing의 차이점을 알아야한다.
- Batch Processing은 한 번에 많은 양의 데이터를 모아서 처리하기 위해 나온 기술로, 일정 주기로 처리하기 위함이다. Spark, Storm 등이 있다.
- Stream Processing은 데이터가 무한하다는 가정에서, 적은 양의 데이터를 모아 실시간으로 처리하는 기술이다. 처리량보다는 처리 속도에 포커스를 둔 기술이다.

4. KSQL 쿼리

  1. 스키마 등록 (DDL: Data Definition Language)
    • CREATE STREAM … WITH (~)
      • 기존 Kafka 토픽을 읽어서 스트림으로 매핑하는 방식으로, 해당 토픽을 ksqldb에서 이런 스키마로 본다고 선언하는 것이다. 
      • 실제로 데이터를 소비하거나 생산하는 일을 시작하지는 않는다.
      • Kafka에 반영되지 않고 단지 ksqlDB 메타데이터에 등록된다.
  2. Push 쿼리
    • CREATE STREAM AS SELECT (CSAS)
      • SELECT 결과를 새로운 토픽에 쓰는 스트림을 만드는 방식이다. (*상태가 존재하지 않음)
      • 각 거래 데이터를 독립적으로 처리할 때 사용한다. 이전에 어떤 제품이 지나갔는지 기억하지 못하며, 사용자별 누적 거래액 계산을 못한다.
    • CREATE TABLE AS SELECT (CTAS)
      • 집계 결과를 Kafka 토픽과 ksql DB 상태스토어에 저장한다. (*상태가 존재)
      • 과거의 데이터를 바탕으로 현재의 값을 계속 갱신할 수 있다. 따라서 집계 연산, Group By, Join 을 사용하기 위해선 결과물이 항상 TABLE이 되어야한다.
      • 결과 테이블에는 최종 값만 저장이 된다.
  3. Pull 쿼리
    • SELECT … FROM my_table WHERE KEY = '…';
      • 테이블의 현재 상태를 요청 시점에 한 번만 가져오는 읽기 전용 쿼리이다

📈 EMIT CHANGE vs EMIT FINAL

EMIT CHANGES는 스트림 쿼리의 기본 동작으로, 결과가 변경될 때마다 실시간으로 결과를 내보낸다.
EMIT FINAL은 윈도우 기반의 집계 쿼리에서만 사용되며, 윈도우가 종료될 때 최종 결과만을 내보낸다.

5. KSQL 쿼리 예제

  • 목표: 사용자별 활동 세션 분석하기
    • 사용자가 60초 이상 아무 활동이 없으면
    • 각 세션동안 몇 개의 페이지를 봤는지
    • 어떤 페이지들을 중복 없이 방문했는지
    • 세션 유지 시간은 얼마인지 실시간으로 집계
  • Mock 데이터
    • 카프카 커텍터란, 개발자가 매번 데이터를 보내는 프로듀서나 데이터를 가져오는 컨슈머 코드를 직접 짤 필요없이, 간단한 설정만으로 데이터 파이프라인을 구축하게 해주는 도구이다.
      • 싱크 커넥터: 카프카 토픽에 쌓여있는 데이터를 소비(Consume)해서 지정된 외부 시스템으로 내보내는(Sink) 역할을 한다. 내부적으로 카프카 컨슈머처럼 동작한다.
      • 소스 커넥터: 다양한 외부 데이터 소스로부터 데이터를 읽어와 카프카 토픽으로 보내는(Produce) 역할을 합니다. 내부적으로 카프카 프로듀서처럼 동작한다.
    • Datagen 커넥터를 사용하여 가상의 데이터 스트림을 만든다. (소스 커넥터)
  • 쿼리
CREATE TABLE user_sessions AS
  SELECT
    userid,
    COUNT(*) AS page_views,
    COLLECT_SET(pageid) AS distinct_pages,
    (MAX(ROWTIME) - MIN(ROWTIME)) / 1000 AS session_duration_seconds
  FROM
    pageviews
  WINDOW SESSION (60 SECONDS) //비활동 간격이 60초인 경우
  GROUP BY
    userid
  EMIT CHANGES;

6. 트러블 슈팅

  • Key 값 직렬화 오류
    • 기본 개념: 다른 서버로 데이터를 보내거나, 데이터베이스에 영구적으로 상태를 저장해야할 때, 메모리 구조 자체를 저장하거나 통신할 수 없다. 따라서 객체를 직렬화하여 바이트 스트림 형태로 만들어 전송하고, 받는 쪽에서 역직렬화하여 사용한다. (JSON, XML 등) 따라서, KSQLDB에서 KAFKA 토픽으로 내보낼 때 직렬화해야한다.
    • 문제: 근데 Key 값이 깨져서 보이는 오류가 발생했다. (GROUP BY가 trid라, 자동으로 Key가 trid로 된다고 생각)
    • 이유: GROUP BY에 WINDOW(SESSION, TUMBLING 등)를 함께 사용하면, ksqlDB는 내부적으로 (키, 윈도우 시작시간)을 묶어 복합 키(Composite Key)로 사용한다. 윈도우 집계를 사용하는 한 TRID만 단독으로 키가 되게 하는 것은 불가능하다.
    • 해결: 추가적인 스트림을 두 개 추가하여 trid만 KEY에 저장되도록 수정하였다.
  • timestamp 간 간격 구하기
    • 문제: trid를 Group by로 묶어 timestamp 간 간격을 새로운 토픽에 저장하려고 하지만, timestamp 간격을 구할 수 없었다.
    • 이유: ksql 문법만으로는 이전 행의 데이터를 가져와 계산할 수 없다. 단, 윈도우 집계 안에서만 제한적으로 사용 가능하다. 
    • 해결: LATEST_OFFSET 함수를 이용했다.
CREATE OR REPLACE TABLE TRID_TS_TABLE AS
SELECT
    TRID,
    AS_VALUE(trid) AS "trid",
    LATEST_BY_OFFSET(`timestamp`,2)[1] AS latest_ts,
    LATEST_BY_OFFSET(`timestamp`, 1)[1] AS prev_ts,
    LATEST_BY_OFFSET(`timestamp`,1)[1] - LATEST_BY_OFFSET(`timestamp`, 2)[1] AS timestamp_delta
FROM
    ORIGINAL_STREAM
WINDOW SESSION (5 SECONDS)
GROUP BY
    TRID
EMIT CHANGES;