본문 바로가기
  • SDXL 1.0 + 한복 LoRA
  • SDXL 1.0 + 한복 LoRA
Study/빅데이터

[간보기 | kafka] 쉬어가자 - 소스 정리와 모니터링 툴

by 마즈다 2016. 4. 30.
반응형



소스 정리와 모니터링 툴


소스 정리

일단 급하게 기능을 확인하다보니 소스 코드가 엉망이다.
조금이나마 다듬어야 보기가 편할 것 같아 쉬어갈 겸 
우선 node 소스들을 정리했다.


tmgetter.js 
(트위터 메시지를 받아서 콜백 함수를 통해 topic으로 메시지를
보내는 모듈)

var Twitter = require('twitter');
var count = 0;


var client = new Twitter({
  consumer_key: '...',
  consumer_secret: '...',
  access_token_key: '...',
  access_token_secret: '...',
});

var msgArr = new Array();

/**
*  parameter
* msgCount : kafka 토픽으로 보낼 메시지 배열의 수
* sendMessage : kafka 토픽으로 메시지를 보낼 콜백 함
**/
var getTwitterMessage = function(kafka, msgCount, sendMessage) {
   /**
   * Stream statuses filtered by keyword
   * number of tweets per second depends on topic popularity
   **/
   client.stream('statuses/sample', {track: ''},  function(stream){
      stream.on('data', function(tweet) {
        if (tweet.lang == 'kr' || tweet.lang == 'ko') {
          if (count % msgCount == 0) {
            // 메시지 배열에 값이 들어있는 경우 초기화 시
            if (msgArr.length > 0) {
              while(msgArr.length > 0) {
                msgArr.pop();
              }
            }
          }

          var messageStr = extractMessage(tweet);

          msgArr.push(messageStr);

          if ((count % msgCount) == (msgCount - 1)) {        
            if (typeof sendMessage === 'function') {
              var Producer = kafka.Producer;
              var Client = kafka.Client;
              var client = new Client('NAMENODE.local:2181,SECONDARY-NAMENODE.local:2181,DATANODE1.local:2181,DATANODE2.local:2181,DATANODE3.local:2181');

              var topic = 'twittopic';
              var producer = new Producer(client, { requireAcks: 1 });

              sendMessage(producer, msgArr, count);
            }
          }
          count++;
        }
      });

      stream.on('error', function(error) {
        console.log(error);
      });
    });

}

// 트위터 메시지에서 필요한 부분만 추출하는 함수
var extractMessage = function(tweet) {
  var messageObj = {};

  messageObj.createdAt = tweet.created_at;
  messageObj.id = tweet.id;
  messageObj.idStr = tweet.id_str;
  messageObj.text = tweet.text;
  messageObj.UserScreenName = tweet.user.screen_name;
  
  messageObj.retweetCreatedAt = '';
  messageObj.retweetId = '';
  messageObj.retweetIdStr = '';
  messageObj.retweetText = '';
  messageObj.retweetUserScreenName = '';
  messageObj.retweetCount = '';
  
  if (tweet.retweeted_status) {
    messageObj.retweetCreatedAt = tweet.retweeted_status.created_at;
    messageObj.retweetId = tweet.retweeted_status.id;
    messageObj.retweetIdStr = tweet.retweeted_status.id_str;
    messageObj.retweetText = tweet.retweeted_status.text;
    messageObj.retweetUserScreenName = tweet.retweeted_status.user.screen_name;
    messageObj.retweetCount = tweet.retweeted_status.retweet_count;
  }
  
  return JSON.stringify(messageObj)
}

exports.getTwitterMessage = getTwitterMessage;

sender.js
(트위터 메시지를 topic에 전달할 모듈)

var sendMessage = function(producer, msgArr, count) {

  console.log('==start sendMessage=================================================================');

  producer.on('ready', function () {
    console.log('-> producer ready');

    var p = count % 3;
    producer.send([
        {topic: 'twittopic', partition: p, messages: msgArr, attributes: 0 }
      ], function (err, data) {
          if (err) {
            console.log('-> Error : ' + err);
          }
          else {
            console.log('-> send %d messages', count);
            console.log('-> send to %d Partition', p);
          }

          producer.close(function() {
            console.log('-> close produce');
            console.log('==end sendMessage===================================================================');
          });
    });
  });

  producer.on('error', function (err) {
    console.log('error', err)
  });
};

exports.sendMessage = sendMessage;

index.js
(실행 모듈)

var kafka = require('kafka-node');

var tmgetter = require('./tmgetter');
var sender = require('./sender');
// producer를 별도의 모듈로 만들고 오브젝트를 생성하여 사용하고자 했으나
// 이렇게 하면 무슨 이유인지 producer가 작동하지 않는다.
//  producer의 ready 이벤트가 어떤 조건에서 발생하는지 모르겠다...ㅠ.ㅠ
//var producer = require('./producer');


tmgetter.getTwitterMessage(kafka, 20, sender.sendMessage);

일단 이렇게 소스는 조금 정리를 했다.
Producer쪽 로그도 조금 깔끔하게 다듬었다.



모니터링 툴

Kafka Manager


비교적 다양한 정보를 볼 수 있는 장점이 있다. 물론 그만큼 설정이
복잡하다. 물론 기본적인 설정만으로도 웬만한 정보는 확인 가능하며
지난 포스팅에 언급한 것 처럼 JMX 기반으로 정보를 확인하고자 하면
kafka 서버 실행시 다음과 같이 JMX 포트를 명시해주어야 한다.


env JMX_PORT=9999 ./kafka-server-start.sh ../config/server.properties


설치 및 실행은 지난 포스팅에 링크한 github 페이지를 참고하도록 하고
오늘은 화면 설명만 간단하게 하겠다.


클러스터 설정 화면


일단 가장 기본적인 Cluster Name을 정해주고
Cluster Zookeeper Hosts설정을 해준다. 제목대로 Zookeeper 노드의 
주소와 포트를 넣어주면 된다. 다음 Kafka Version을 선택하고 마지막으로 
JMX 관련 설정을 하면 기본적인 설정은 끝이다. 나머지 설정들은 기본값으로…
설정이 끝났으면 화면 맨 아래의 SAVE 버튼을 누르자.


설정을 저장하고나서 상단 Cluster메뉴에서 List를 선택하면
아주 심플한 화면에 방금 설정을 마친 클러스터 이름이 보인다.

이 클러스터 이름을 클릭하면 다음과 같은 화면이 나온다.


여전히 화면은 심플하지만 상단에 뭔가 메뉴가 잔뜩 생겼다.
사실 상단 메뉴의 대부분은 뭔가를 생성하거나 변경하는 내용들이라
아직 직접 사용해 보지는 않았다. 이제 친숙한 용어들을 눌러보자


예상했듯이 topic 목록이 나온다. 나는 토픽을 1개만 생성을 했기 때문에
하나의 항목이 보이고 보다시피 topic 이름은 ‘twittopic’이다.
내용을 보면 알겠지만 3개의 partition이 있고 5개의 broker가 구동중이며
3개의 복제본이 있다. 다른 항목들은 그냥 보면 되겠는데 
Brokers Skew %라는 것이 뭔지 도통 모르겠다.
topic 이름을 클릭해보자


뭔가 조금 복잡한 화면이 나왔지만 살펴보연 목록에 나왔던 내용들의

반복에 특정 시간 간격에서의 전송량 등 쉽게 알 수 있는 내용들이
표시된다. 특히 Partitions by Broker를 보면 topic의 각 partition이
전체 broker에 어떻게 복제가 되어있는지 알 수 있다.
여기도 역시 Skewed라는 항목이 있는데 skewed의 사전적 의미에
‘왜곡된’, ‘편향된’ 이라는 의미가 있고 Skewed가 id 2번의 브로커만
true이면서 3개의 partition의 복제본이 모두 존재하는 상황으로
미루어보건데 partition의 복제본이 정도 이상으로 집중된 상황을
표시하는 것 같다. 만일 복제본을 5개로 설정했다면 전체 Broker가
모두 Skewed 값이 true가 되고 Broker Skewed %도 100%가
되지 않알까 싶다. 요건 추후 테스트해봐야겠다.


만일 JMX를 사용하지 않게 되면 Metrics 영역에 보이는 회색 타원
영역의 값들이 표시되지 않는다.


Consumers consuming from this topic는 Consumer 그룹을
보여준다. 나는 하나의 컨슈머 그룹만 설정을 했고 그 이름을
kafka-node-group으로 했기에 하나만 보여지고 있다. 한 번 눌러보자


현재 topic partition이 3개이고 consumer가 하나의 그룹에 3개의
인스턴스로 존재하기 때문에 각각의 partition과 consumer는 1:1로
연결이 된다.


다시 돌아가서 이번에는 Broker를 한 번 확인해보자. 현재 3개의 
복제본이 모두 저장되어있는 2번 Broker를 클릭해보겠다.


역시 조금 복잡해보이지만 앞의 정보들이 반복해서 보여진다.
처음으로 그럴듯한 챠트가 하나 나왔다. 시간 흐름에 따라 처리되는
메시지의 양을 보여주는 그래프다. Per Topic Detail영역을 보면
토픽에 대한 정보를 볼 수 있는데 앞의 메뉴에서 이미 확인된 내용들이다.


이렇게 뭔가 화면이 많고 복잡해 보이기는 하나 대부분 중복된
내용이 많다. 그리고 JMX를 이용하지 않을 경우 처리량에 해당하는 정보
(회색 타원 영역과 그래프 등)는 볼 수 없다.


Kafka Consumer Offset Monitor


이 툴은 이름에서도 알 수 있듯이 Cunsumer가 message를 소비하는데
초점이 맞춰져있다. 굉장히 심플하다. 별다른 설정도 필요 없다.
메인 메뉴는 다음과 같다.


Consumer Groups를 선택한 화면이다.
앞서 언급한 것처럼 나는 kafka-node-group이라는 하나의 그룹만을
가지고 있다.


kafka-node-group을 눌러보자



아주 깔끔하게 Broker 목록과 Consumer의 message 소비 상황이보인다.


전체 Broker는 5대의 node(Mac mini)에서 실행되고 있지만 
Consumer는 2, 3, 4번 Broker에서만 실행되고 있기 때문에 Broker
목록에 2, 3, 4번만 보인다.


Consumer Offsets 영역에는 partition 번호와 소비한 message
offset과 size 등의 정보가 보여진다. 그리고 topic 이름인 twittopic을
클릭하면 아래와 같은 그래프가 표시된다.



zoom에서 선택한 시간 동안의 message 소비의 변화량을 보여준다.


메인 메뉴의 2번째 메뉴인 Topic List 메뉴도 결국에는 이 화면으로 
들어오게 된다.


마지막의 Visualizations 메뉴는 클러스터의 구조를 트리 형태로
보여주는 메뉴이며 다음 2개의 화면을 제공한다.


이상과 같이 이번 포스팅에서는 간단하게 2개의 kafka 모니터링 툴에 대해
알아보았다. 아직 잘 모르는 항목들과 기능 버튼들이 있는데 이런 부분들은
추후 확인되는대로 내용을 보완해나가도록 하겠다.


반응형