내 세상

[NodeJS] KafkaJS 사용 방법 - Consumer 편 본문

Technical/NodeJS

[NodeJS] KafkaJS 사용 방법 - Consumer 편

sga8 2021. 12. 23. 07:25
728x90
반응형

 

kafkaJs: 1.14.0 (https://www.npmjs.com/package/kafkajs)

Winston Logger: 3.2.1 (https://www.npmjs.com/package/winston)

 

 

kafka Constructor를 사용한 Kafka 전반적인 환경설정

  • clientId: Kafka Broker로 전달되는 모든 request에 대한 identifier
  • brokers: Kafka Broker의 주소, ex) [ "111.112.113.114:9095", "111.112.113.115:9095", ... ]
  • connectionTimeout: Kafka connection을 위해 대기하는 시간 (default: 1000, ms 단위)
  • retry
    • initialRetryTime: Connection시 최초 Retry Time (ms 단위)
    • retries: Retires 횟수
  • logLevel: Logger를 통해 찍어낼 Log의 Level
  • logCreator: 발생하는 Log를 찍어낼 Logger
const kafka = new Kafka({
        clientId: kafka_config.clientId,
        brokers: kafka_config.brokers,
        connectionTimeout: 10000,
        retry: {
          initialRetryTime: 100,
          retries: 8
        },
        logLevel: logLevel.INFO,
        logCreator: logger_kafka
      });

 

 

Kafka Consumer에 대한 환경설정

  • groupId: Kafka Consumer의 Group ID
  • maxWaitTimeInMs: 설정된 시간(ms)만큼 대기 후 Message를 처리함. 
    • 값이 지나치게 클 경우, realtime에 대한 처리가 어려움. 무조건 정해진 시간을 기다리기 때문에. 
      const consumer = kafka.consumer({
        groupId: groupId,
        maxWaitTimeInMs: 100
      });

 

 

Kafka Consumer Message에 대한 처리 로직

 

 

 

/* eslint-disable no-unused-vars */

// #################### Kafka Consumer ###########################
import { Kafka, logLevel } from "kafkajs";
import kafka_config from "../kafka-config.json";
import _ from "lodash";
require("dotenv").config();
const NODE_ENV = process.env.npm_lifecycle_event.split(":")[1];

let logger_kafka;
let logger;
module.exports = {
  init: function init(
    _logger_kafka,
    _logger,
  ) {
    logger_kafka = _logger_kafka;
    logger = _logger;
    try {
      const kafka = new Kafka({
        clientId: kafka_config.clientId,
        brokers: kafka_config.brokers,
        connectionTimeout: 10000,
        retry: {
          initialRetryTime: 100,
          retries: 8
        },
        logLevel: logLevel.INFO,
        logCreator: logger_kafka
      });

      let topic_First;
      let topic_Second;
      let groupId;
      logger.info("[ServerLog] NODE_ENV: " + NODE_ENV);
      groupId = "Sga8";
      topic_First = kafka_config.AgentResponseTopic;
      topic_Second = kafka_config.PMURequestTopic;
      logger.info(
        "[ServerLog] Kafka Consumer groupId: " +
          groupId +
          ", topic_First: " +
          topic_First +
          ", topic_Second: " +
          topic_Second
      );
      const consumer = kafka.consumer({
        groupId: groupId,
        maxWaitTimeInMs: 100
      });

      const runConsumer = async () => {
        await consumer.connect();
        await consumer.subscribe({ topic: topic_First, fromBeginning: false });
        await consumer.subscribe({ topic: topic_Second, fromBeginning: false });
        await consumer.run({
          partitionsConsumedConcurrently: 6,
          eachMessage: async ({ topic, partition, message }) => {
            if (topic === topic_First) {
                // method for topic_First
            } else if (topic === topic_Second) {
                // method for topic_second
            }
          }
        });
      };

      runConsumer().catch(e => logger.error(`[consumer] ${e.stack}`));
      const errorTypes = ["unhandledRejection", "uncaughtException"];
      const signalTraps = ["SIGTERM", "SIGINT", "SIGUSR2"];

      errorTypes.map(type => {
        process.on(type, async e => {
          try {
            logger.error(`@@@@@@@@@@@ KafkaConsumer.js ${type} @@@@@@@@@@@`);
            logger.error(`process.on ${type}`);
            logger.error(e.stack);
            logger.error(JSON.stringify(e, null, 4));
            await consumer.disconnect();
            // process.exit(0);
          } catch (_) {
            // process.exit(1);
          }
        });
      });

      signalTraps.map(type => {
        process.once(type, async () => {
          try {
            await consumer.disconnect();
          } finally {
            // process.kill(process.pid, type);
          }
        });
      });
    } catch (exception) {
      logger.error("[ServerLog] Kafka Consumer Exception..");
      logger.error(exception.stack);
    }
  }

  // ##################################################################
};
728x90
반응형