250x250
Notice
Recent Posts
Recent Comments
Link
일 | 월 | 화 | 수 | 목 | 금 | 토 |
---|---|---|---|---|---|---|
1 | 2 | |||||
3 | 4 | 5 | 6 | 7 | 8 | 9 |
10 | 11 | 12 | 13 | 14 | 15 | 16 |
17 | 18 | 19 | 20 | 21 | 22 | 23 |
24 | 25 | 26 | 27 | 28 | 29 | 30 |
Tags
- JavaScript
- Express
- npm
- nodejs
- Node
- mysql 5.5
- eslint
- spring cloud
- Regular expression
- Effective Java
- log_bin
- git
- current_date
- migration
- Spring Batch
- 정규표현식
- 퀵소트
- REACT
- REACTJS
- log4j2
- try catch
- spring
- MySQL
- java
- Chunk
- Effective Java 3/e
- upgrade
- expire_logs_days
- regex
- update
Archives
- Today
- Total
내 세상
[NodeJS] KafkaJS 사용 방법 - Consumer 편 본문
728x90
반응형
kafkaJs: 1.14.0 (https://www.npmjs.com/package/kafkajs)
Winston Logger: 3.2.1 (https://www.npmjs.com/package/winston)
kafka Constructor를 사용한 Kafka 전반적인 환경설정
- clientId: Kafka Broker로 전달되는 모든 request에 대한 identifier
- brokers: Kafka Broker의 주소, ex) [ "111.112.113.114:9095", "111.112.113.115:9095", ... ]
- connectionTimeout: Kafka connection을 위해 대기하는 시간 (default: 1000, ms 단위)
- retry
- initialRetryTime: Connection시 최초 Retry Time (ms 단위)
- retries: Retires 횟수
- logLevel: Logger를 통해 찍어낼 Log의 Level
- logCreator: 발생하는 Log를 찍어낼 Logger
const kafka = new Kafka({
clientId: kafka_config.clientId,
brokers: kafka_config.brokers,
connectionTimeout: 10000,
retry: {
initialRetryTime: 100,
retries: 8
},
logLevel: logLevel.INFO,
logCreator: logger_kafka
});
Kafka Consumer에 대한 환경설정
- groupId: Kafka Consumer의 Group ID
- maxWaitTimeInMs: 설정된 시간(ms)만큼 대기 후 Message를 처리함.
- 값이 지나치게 클 경우, realtime에 대한 처리가 어려움. 무조건 정해진 시간을 기다리기 때문에.
const consumer = kafka.consumer({
groupId: groupId,
maxWaitTimeInMs: 100
});
Kafka Consumer Message에 대한 처리 로직
/* eslint-disable no-unused-vars */
// #################### Kafka Consumer ###########################
import { Kafka, logLevel } from "kafkajs";
import kafka_config from "../kafka-config.json";
import _ from "lodash";
require("dotenv").config();
const NODE_ENV = process.env.npm_lifecycle_event.split(":")[1];
let logger_kafka;
let logger;
module.exports = {
init: function init(
_logger_kafka,
_logger,
) {
logger_kafka = _logger_kafka;
logger = _logger;
try {
const kafka = new Kafka({
clientId: kafka_config.clientId,
brokers: kafka_config.brokers,
connectionTimeout: 10000,
retry: {
initialRetryTime: 100,
retries: 8
},
logLevel: logLevel.INFO,
logCreator: logger_kafka
});
let topic_First;
let topic_Second;
let groupId;
logger.info("[ServerLog] NODE_ENV: " + NODE_ENV);
groupId = "Sga8";
topic_First = kafka_config.AgentResponseTopic;
topic_Second = kafka_config.PMURequestTopic;
logger.info(
"[ServerLog] Kafka Consumer groupId: " +
groupId +
", topic_First: " +
topic_First +
", topic_Second: " +
topic_Second
);
const consumer = kafka.consumer({
groupId: groupId,
maxWaitTimeInMs: 100
});
const runConsumer = async () => {
await consumer.connect();
await consumer.subscribe({ topic: topic_First, fromBeginning: false });
await consumer.subscribe({ topic: topic_Second, fromBeginning: false });
await consumer.run({
partitionsConsumedConcurrently: 6,
eachMessage: async ({ topic, partition, message }) => {
if (topic === topic_First) {
// method for topic_First
} else if (topic === topic_Second) {
// method for topic_second
}
}
});
};
runConsumer().catch(e => logger.error(`[consumer] ${e.stack}`));
const errorTypes = ["unhandledRejection", "uncaughtException"];
const signalTraps = ["SIGTERM", "SIGINT", "SIGUSR2"];
errorTypes.map(type => {
process.on(type, async e => {
try {
logger.error(`@@@@@@@@@@@ KafkaConsumer.js ${type} @@@@@@@@@@@`);
logger.error(`process.on ${type}`);
logger.error(e.stack);
logger.error(JSON.stringify(e, null, 4));
await consumer.disconnect();
// process.exit(0);
} catch (_) {
// process.exit(1);
}
});
});
signalTraps.map(type => {
process.once(type, async () => {
try {
await consumer.disconnect();
} finally {
// process.kill(process.pid, type);
}
});
});
} catch (exception) {
logger.error("[ServerLog] Kafka Consumer Exception..");
logger.error(exception.stack);
}
}
// ##################################################################
};
728x90
반응형
'Technical > NodeJS' 카테고리의 다른 글
[NodeJS][NPM] pingus package 사용법 (0) | 2024.01.08 |
---|---|
[NodeJS] NodeJS Package들을 최신 버전으로 업데이트 하기 (0) | 2022.06.24 |
[NodeJS] NodeJS 버전 Upgrade 방법 (Linux / Windows) (0) | 2022.06.24 |
[NodeJS] Microservice Architecture (0) | 2021.11.29 |
[NodeJS] process.nextTick, uncaughtException (0) | 2021.11.26 |