Kafka 정리를 마치며


분산 시스템 관리의 어려움

얼추 node 모듈을 이용한 kafka 서비스가 구현이 된 것 같았다.
트위터 Streaming API를 이용하여 데이터를 잘 가져오고,
producer는 이 데이터를 broker에게 잘 전달하고,
consumer는 broker로부터 데이터를 잘 가져와 로그를 뿌려주고…


하지만 어느 순간 이러한 프로세스가 중지되어있기 일쑤였다.
zookeeper쪽이나 kafka쪽이나 서버 콘솔에 출력되는 로그는
대체로 네트워크가 끊겼다는 메시지인데 도대체 이 문제가 어떤 원인으로
발생하는 지를 알 수가 없는 것이다.


애초에 분산 시스템에서 장애의 원인을 찾는 것은 매우 어려운 일이라는 것은
알고 있었지만 아무리 작은 클러스터라도 이 문제를 직접 겪으니 참
답이 안나온다.(물론 나의 경험과 지식의 부족이 가장 큰 역할을 했겠지만…ㅠ.ㅠ)


물리적인 네트워크가 문제인지, zookeeper에서 문제가 발생한 것인지
kafka에서 문제가 발생한 것인지…게다가 zookeeper와 kafka의 장애에 대한
상세한 자료들은 찾기가 쉽지 않아서…


결국 이 문제로 거의 2주 가량을 별다른 진척 없이 zookeeper와 kafka의
에러 로그에 대한 구글링만 하면서 보냈다.


혹시나 해서 몇대 안되는 클러스터에서 잔뜩 돌아가고 있던 HBase와
Storm 서버들도 모두 죽여버렸다.

최종적으로 구성된 나의 허접한 클러스터는 아래와 같다.



<가난한 자의 클러스터 이미지>


황당한 원인과 새로운 문제


사실 문제는 너무나 명백한 곳에 있었다.
현재 총 5대의 맥미니로 구성된 클러스터에서 4대는 서버 전용으로만
사용했으나 1대를 일반 가정용 용도로도 사용을 하는 과정에서 서버 전용의
4대는 절전 모드를 꺼놓았는데 이 한 대에 대해서는 절전모드를 켜놓은
상태였던 것이다. 그러니 절전모드로 들어가면서 이 한대에서 돌고있던
zookeeper, kafka는 물론 여기서 돌고 있던 node.js의 producer 모듈까지
모두 맛이 가버린 것이다. 


결국 전기세의 압박에도 불구하고 절전모드를 모두 해제하고 상황을 지켜보았더니
트위터 메시지 건수 기준으로 기존에 4~5천 건에서 죽던 프로세스가
대략 7만 건 이상을 처리할 수 있게 되었다.


하지만 아직도 갈 길이 먼 것이 약 7만 건 정도 처리를 하고 나면 zookeeper에서
Purge task가 발생을 하는데 이 시점에서 producer 프로세스가 멈춰버린다.
zookeeper의 purse 관련 설정에 대해 알아보고는 있으나 역시 자료도 많지 않고
나의 무식은 큰 장애가 되고 있고…ㅠ.ㅠㅠ


일단 다음 단계로


내가 하고자 하는 것은 트위터 데이터를 모아 형태소 분석을 거쳐 특정 시점에
가장 많이 언급된 단어들을 추출하고 그 단어에 대한 긍정/부정의 평가를 한 후
다시 그 단어가 언급된 공인 미디어를 검색하여 긍정/부정 평가에 대한
공신력을 추가하는 작업이다.


그 중에 이제 데이터 수집 단계를 진행하고 있으며 기왕이면 공부좀 해보자고
kafka에 손을 대본 것인데 역시 한계가 있다. 하지만 목표한 바를 진행하면서
최대한 틀린 부분을 바로 잡고 몰랐던 것을 채워 나가야겠다.


당장에 진행할 다음 단계는 현재 일없이 로그만 찍어대고 있는 consumer에
제대로 된 역할, 즉 Hadoop으로 파일을 저장하는 일을 좀 시키려고 한다.
역시 node 모듈을 사용할 것이고 그 과정 또한 차근차근 정리해 볼 생각이다.


선무당이 사람 잡는다.

잘 모르는 내용을 억지로 진행하다보니 잘못된 정보를 기록하게 되는 경우도
많은 것 같다. 앞으로는 가급적 핵심적인 내용은 잘 정리된 외부 사이트를
인용을 하고 내가 실제 눈으로 본 것들을 중심으로 정리를 해야겠다.

저작자 표시
신고



소스 정리와 모니터링 툴


소스 정리

일단 급하게 기능을 확인하다보니 소스 코드가 엉망이다.
조금이나마 다듬어야 보기가 편할 것 같아 쉬어갈 겸 
우선 node 소스들을 정리했다.


tmgetter.js 
(트위터 메시지를 받아서 콜백 함수를 통해 topic으로 메시지를
보내는 모듈)

var Twitter = require('twitter');
var count = 0;


var client = new Twitter({
  consumer_key: '...',
  consumer_secret: '...',
  access_token_key: '...',
  access_token_secret: '...',
});

var msgArr = new Array();

/**
*  parameter
* msgCount : kafka 토픽으로 보낼 메시지 배열의 수
* sendMessage : kafka 토픽으로 메시지를 보낼 콜백 함
**/
var getTwitterMessage = function(kafka, msgCount, sendMessage) {
   /**
   * Stream statuses filtered by keyword
   * number of tweets per second depends on topic popularity
   **/
   client.stream('statuses/sample', {track: ''},  function(stream){
      stream.on('data', function(tweet) {
        if (tweet.lang == 'kr' || tweet.lang == 'ko') {
          if (count % msgCount == 0) {
            // 메시지 배열에 값이 들어있는 경우 초기화 시
            if (msgArr.length > 0) {
              while(msgArr.length > 0) {
                msgArr.pop();
              }
            }
          }

          var messageStr = extractMessage(tweet);

          msgArr.push(messageStr);

          if ((count % msgCount) == (msgCount - 1)) {        
            if (typeof sendMessage === 'function') {
              var Producer = kafka.Producer;
              var Client = kafka.Client;
              var client = new Client('NAMENODE.local:2181,SECONDARY-NAMENODE.local:2181,DATANODE1.local:2181,DATANODE2.local:2181,DATANODE3.local:2181');

              var topic = 'twittopic';
              var producer = new Producer(client, { requireAcks: 1 });

              sendMessage(producer, msgArr, count);
            }
          }
          count++;
        }
      });

      stream.on('error', function(error) {
        console.log(error);
      });
    });

}

// 트위터 메시지에서 필요한 부분만 추출하는 함수
var extractMessage = function(tweet) {
  var messageObj = {};

  messageObj.createdAt = tweet.created_at;
  messageObj.id = tweet.id;
  messageObj.idStr = tweet.id_str;
  messageObj.text = tweet.text;
  messageObj.UserScreenName = tweet.user.screen_name;
  
  messageObj.retweetCreatedAt = '';
  messageObj.retweetId = '';
  messageObj.retweetIdStr = '';
  messageObj.retweetText = '';
  messageObj.retweetUserScreenName = '';
  messageObj.retweetCount = '';
  
  if (tweet.retweeted_status) {
    messageObj.retweetCreatedAt = tweet.retweeted_status.created_at;
    messageObj.retweetId = tweet.retweeted_status.id;
    messageObj.retweetIdStr = tweet.retweeted_status.id_str;
    messageObj.retweetText = tweet.retweeted_status.text;
    messageObj.retweetUserScreenName = tweet.retweeted_status.user.screen_name;
    messageObj.retweetCount = tweet.retweeted_status.retweet_count;
  }
  
  return JSON.stringify(messageObj)
}

exports.getTwitterMessage = getTwitterMessage;

sender.js
(트위터 메시지를 topic에 전달할 모듈)

var sendMessage = function(producer, msgArr, count) {

  console.log('==start sendMessage=================================================================');

  producer.on('ready', function () {
    console.log('-> producer ready');

    var p = count % 3;
    producer.send([
        {topic: 'twittopic', partition: p, messages: msgArr, attributes: 0 }
      ], function (err, data) {
          if (err) {
            console.log('-> Error : ' + err);
          }
          else {
            console.log('-> send %d messages', count);
            console.log('-> send to %d Partition', p);
          }

          producer.close(function() {
            console.log('-> close produce');
            console.log('==end sendMessage===================================================================');
          });
    });
  });

  producer.on('error', function (err) {
    console.log('error', err)
  });
};

exports.sendMessage = sendMessage;

index.js
(실행 모듈)

var kafka = require('kafka-node');

var tmgetter = require('./tmgetter');
var sender = require('./sender');
// producer를 별도의 모듈로 만들고 오브젝트를 생성하여 사용하고자 했으나
// 이렇게 하면 무슨 이유인지 producer가 작동하지 않는다.
//  producer의 ready 이벤트가 어떤 조건에서 발생하는지 모르겠다...ㅠ.ㅠ
//var producer = require('./producer');


tmgetter.getTwitterMessage(kafka, 20, sender.sendMessage);

일단 이렇게 소스는 조금 정리를 했다.
Producer쪽 로그도 조금 깔끔하게 다듬었다.



모니터링 툴

Kafka Manager


비교적 다양한 정보를 볼 수 있는 장점이 있다. 물론 그만큼 설정이
복잡하다. 물론 기본적인 설정만으로도 웬만한 정보는 확인 가능하며
지난 포스팅에 언급한 것 처럼 JMX 기반으로 정보를 확인하고자 하면
kafka 서버 실행시 다음과 같이 JMX 포트를 명시해주어야 한다.


env JMX_PORT=9999 ./kafka-server-start.sh ../config/server.properties


설치 및 실행은 지난 포스팅에 링크한 github 페이지를 참고하도록 하고
오늘은 화면 설명만 간단하게 하겠다.


클러스터 설정 화면


일단 가장 기본적인 Cluster Name을 정해주고
Cluster Zookeeper Hosts설정을 해준다. 제목대로 Zookeeper 노드의 
주소와 포트를 넣어주면 된다. 다음 Kafka Version을 선택하고 마지막으로 
JMX 관련 설정을 하면 기본적인 설정은 끝이다. 나머지 설정들은 기본값으로…
설정이 끝났으면 화면 맨 아래의 SAVE 버튼을 누르자.


설정을 저장하고나서 상단 Cluster메뉴에서 List를 선택하면
아주 심플한 화면에 방금 설정을 마친 클러스터 이름이 보인다.

이 클러스터 이름을 클릭하면 다음과 같은 화면이 나온다.


여전히 화면은 심플하지만 상단에 뭔가 메뉴가 잔뜩 생겼다.
사실 상단 메뉴의 대부분은 뭔가를 생성하거나 변경하는 내용들이라
아직 직접 사용해 보지는 않았다. 이제 친숙한 용어들을 눌러보자


예상했듯이 topic 목록이 나온다. 나는 토픽을 1개만 생성을 했기 때문에
하나의 항목이 보이고 보다시피 topic 이름은 ‘twittopic’이다.
내용을 보면 알겠지만 3개의 partition이 있고 5개의 broker가 구동중이며
3개의 복제본이 있다. 다른 항목들은 그냥 보면 되겠는데 
Brokers Skew %라는 것이 뭔지 도통 모르겠다.
topic 이름을 클릭해보자


뭔가 조금 복잡한 화면이 나왔지만 살펴보연 목록에 나왔던 내용들의

반복에 특정 시간 간격에서의 전송량 등 쉽게 알 수 있는 내용들이
표시된다. 특히 Partitions by Broker를 보면 topic의 각 partition이
전체 broker에 어떻게 복제가 되어있는지 알 수 있다.
여기도 역시 Skewed라는 항목이 있는데 skewed의 사전적 의미에
‘왜곡된’, ‘편향된’ 이라는 의미가 있고 Skewed가 id 2번의 브로커만
true이면서 3개의 partition의 복제본이 모두 존재하는 상황으로
미루어보건데 partition의 복제본이 정도 이상으로 집중된 상황을
표시하는 것 같다. 만일 복제본을 5개로 설정했다면 전체 Broker가
모두 Skewed 값이 true가 되고 Broker Skewed %도 100%가
되지 않알까 싶다. 요건 추후 테스트해봐야겠다.


만일 JMX를 사용하지 않게 되면 Metrics 영역에 보이는 회색 타원
영역의 값들이 표시되지 않는다.


Consumers consuming from this topic는 Consumer 그룹을
보여준다. 나는 하나의 컨슈머 그룹만 설정을 했고 그 이름을
kafka-node-group으로 했기에 하나만 보여지고 있다. 한 번 눌러보자


현재 topic partition이 3개이고 consumer가 하나의 그룹에 3개의
인스턴스로 존재하기 때문에 각각의 partition과 consumer는 1:1로
연결이 된다.


다시 돌아가서 이번에는 Broker를 한 번 확인해보자. 현재 3개의 
복제본이 모두 저장되어있는 2번 Broker를 클릭해보겠다.


역시 조금 복잡해보이지만 앞의 정보들이 반복해서 보여진다.
처음으로 그럴듯한 챠트가 하나 나왔다. 시간 흐름에 따라 처리되는
메시지의 양을 보여주는 그래프다. Per Topic Detail영역을 보면
토픽에 대한 정보를 볼 수 있는데 앞의 메뉴에서 이미 확인된 내용들이다.


이렇게 뭔가 화면이 많고 복잡해 보이기는 하나 대부분 중복된
내용이 많다. 그리고 JMX를 이용하지 않을 경우 처리량에 해당하는 정보
(회색 타원 영역과 그래프 등)는 볼 수 없다.


Kafka Consumer Offset Monitor


이 툴은 이름에서도 알 수 있듯이 Cunsumer가 message를 소비하는데
초점이 맞춰져있다. 굉장히 심플하다. 별다른 설정도 필요 없다.
메인 메뉴는 다음과 같다.


Consumer Groups를 선택한 화면이다.
앞서 언급한 것처럼 나는 kafka-node-group이라는 하나의 그룹만을
가지고 있다.


kafka-node-group을 눌러보자



아주 깔끔하게 Broker 목록과 Consumer의 message 소비 상황이보인다.


전체 Broker는 5대의 node(Mac mini)에서 실행되고 있지만 
Consumer는 2, 3, 4번 Broker에서만 실행되고 있기 때문에 Broker
목록에 2, 3, 4번만 보인다.


Consumer Offsets 영역에는 partition 번호와 소비한 message
offset과 size 등의 정보가 보여진다. 그리고 topic 이름인 twittopic을
클릭하면 아래와 같은 그래프가 표시된다.



zoom에서 선택한 시간 동안의 message 소비의 변화량을 보여준다.


메인 메뉴의 2번째 메뉴인 Topic List 메뉴도 결국에는 이 화면으로 
들어오게 된다.


마지막의 Visualizations 메뉴는 클러스터의 구조를 트리 형태로
보여주는 메뉴이며 다음 2개의 화면을 제공한다.


이상과 같이 이번 포스팅에서는 간단하게 2개의 kafka 모니터링 툴에 대해
알아보았다. 아직 잘 모르는 항목들과 기능 버튼들이 있는데 이런 부분들은
추후 확인되는대로 내용을 보완해나가도록 하겠다.


저작자 표시
신고



Kafka 무작정 실행하기 - 2


58의 비밀

먼저 지난 번 마지막에 언급했던 58이란 숫자의 비밀을 밝혀보자.
사실 정확한 원인은 아직 확인 못했다. 다만 지난 번 코드의 구현이 ’트위터
메시지가 하나 들어올 때마다 producer 하나를 만들어 트위터 메시지를
topic에 보낸다’
는 것이었다.


이 과정에서 의심할 수 있는 것은 매번 producer를 만들어 커넥션울 하게 되니
아마도 이 커넥션 수에 제한이 걸려버린 것이 아닐까 하는 부분이었다.


그래서 일단 직감에 의존해 producer에서 topic으로 메시지를 보낸 후
API의 close 함수로 연결을 끊어보았다. 예상이 적중하였는지 이후로는
58개의 제한에 걸리지 않고 트위터에서 받아오는 모든 메시지들이
정상적으로 전송이 되었다.



성능 관리

겨우 이만큼의 구현과 설정을 해놓고 성능 관리를 논하는 것은 우스운 일이지만
일단 트위터 메시지 한 건에 커넥션 하나를 만들었다가 끊는다는 것은 아무리
생각해도 그리 바람직하지는 않다고 판단하였다. 


그리서 우선 아쉬운대로 트위터 메시지 10건이 수신되면 그 10개를 하나의
배열로 하여 producer를 통해 보내도록 하였다. 우선은 시험 삼아
이정도 처리를 한 것이고 시스템 환경에 따라 아마도 이러한 수치를 세밀하게

조절을 해야 할 것이다.


그리고 실제로 topic으로 메시지를 보내는 위치에서 파티션(나는 3개의 
partition 나뉜 1개의 topic을 만들었다)에 고르게 분산시키기 위해
파티션의 인덱스를 계산하는 식을 하나 추가하였다.


  • 처음에는 손이 덜가는 HighLevelProducer를 사용했었으나 뭔가 잘
    작동하지 않는 듯하여 그냥 Producer로 바꾸었다.

수정된 소스

일단 지난 시간의 문제점을 수정하고 몇가지 필요한 내용을 추가한
소스를 다음과 같이 만들었다.

var kafka = require('kafka-node');


var Twitter = require('twitter');
var count = 0;


var client = new Twitter({
  consumer_key: ‘…’,
  consumer_secret: ‘…’,
  access_token_key: ‘…’,
  access_token_secret: ‘…’,
});

var msgArr;

/**
 * Stream statuses filtered by keyword
 * number of tweets per second depends on topic popularity
 **/
client.stream('statuses/sample', {track: ''},  function(stream){
  stream.on('data', function(tweet) {
    if (tweet.lang == 'kr' || tweet.lang == 'ko') {
      if (count % 10 == 0) {
        msgArr = new Array();
      }

      var messageObj = {};

      messageObj.createdAt = tweet.created_at;
      messageObj.id = tweet.id;
      messageObj.idStr = tweet.id_str;
      messageObj.text = tweet.text;
      messageObj.UserScreenName = tweet.user.screen_name;

      messageObj.retweetCreatedAt = '';
      messageObj.retweetId = '';
      messageObj.retweetIdStr = '';
      messageObj.retweetText = '';
      messageObj.retweetUserScreenName = '';
      messageObj.retweetCount = '';

      if (tweet.retweeted_status) {
        messageObj.retweetCreatedAt = tweet.retweeted_status.created_at;
        messageObj.retweetId = tweet.retweeted_status.id;
        messageObj.retweetIdStr = tweet.retweeted_status.id_str;
        messageObj.retweetText = tweet.retweeted_status.text;
        messageObj.retweetUserScreenName = tweet.retweeted_status.user.screen_name;
        messageObj.retweetCount = tweet.retweeted_status.retweet_count;
      }

      var messageStr = JSON.stringify(messageObj)
      console.log(messageStr);
      console.log('===================================================================' + count);

      msgArr.push(messageStr);

      if (count % 10 == 9) {
        console.log('==call sendMessage====================================================');
        var Producer = kafka.Producer;
        var Client = kafka.Client;
        var client = new Client('NAMENODE.local:2181,SECONDARY-NAMENODE.local:2181,DATANODE1.local:2181,DATANODE2.local:2181,DATANODE3.local:2181');

        var topic = 'twittopic';
        var producer = new Producer(client, { requireAcks: 1 });

        sendMessage(producer, msgArr, count);
      }
      count++;
    }
  });

  stream.on('error', function(error) {
    console.log(error);
  });
});

var sendMessage = function(producer, msgArr, count) {

  console.log('==sendMessage=================================================================');

  producer.on('ready', function () {
    console.log('==producer ready=========');
    console.log('send start');

    var p = (count - 9) % 3;
    producer.send([
      {topic: 'twittopic', partition: p, messages: msgArr, attributes: 0 }
    ], function (err, data) {
      if (err) console.log(err);
      else console.log('send %d messages', count);

      producer.close(function() {
        console.log('close produce');
      });
    });
  });


  producer.on('error', function (err) {
    console.log('error', err)
  });
};

모니터링

일단 내용 구현을 하고 실제 작동하는 것 까지 확인을 하였으니 이제제대로
뭔가를 하고 있는 것인지 궁금해졌다. 일단 Kafka 자체적으로는 달리 웹으로 
구현된 관리 화면을 제공하지 않기에 잘 알려진 모니터링 도구 2가지를 
설치해보았다.


Kafka Offset Monitor : http://quantifind.github.io/KafkaOffsetMonitor/

Kafka Manager : https://github.com/yahoo/kafka-manager


이중 Kafka Manager는 Kafka에서 제공하는 JMX 인터페이스를 이용하는
툴로 이 툴을 이용하기위해서는 Kafka 서버 시작 시 JMX 포트를 함께 지정해
주어야 한다.

env JMX_PORT=9999 ./kafka-server-start.sh ../config/server.properties


다음은 각 모니터링 툴의 실제 화면이다.

  • Kafka Offset Monitor


  • Kafka Manager

그리고 다음은 node.js를 통해 실행한 Producer와 Consumer의 콘솔 화면이다.

아직 Consumer에서는 별다른 메시지 가공은 없이 topic에서 가져온 메시지를
로그로 출력만 하고 있다.


ProducerConsumer1 (DATANODE1)

Consumer2(DATANODE2)Consumer3 (DATANODE3)

현재 구성이 topic이 3개의 partition으로 되어있고 3개의 Consumer가 하나의
Consumer 그룹(kaka-node-group : kafka-node에서 자동 생성한 그룹)에
3개의 Consumer가 있기 때문에 각각의 partition과 Consumer는 다음과 같이
1:1로 연결이 되고 있다.


Consumer1 - partition 0
Consumer2 - partition 2
Consumer3 - partition 1


다음 포스팅에서는 현재 내가 구성하고 있는 시스템의 구조를 설명하고
Consumer 중단 시에 rebalancing이 일어나는 모습을 코솔 로그를 통해
확인해보겠다.

저작자 표시
신고



Kafka 무작정 실행하기


참고한 자료들

지난 포스팅까지 Kafka의 개요를 알아보았다.
물론 수박 겉핥기 수준의 내용이었지만 더이상의 자세한 내용은
생략할 수밖에 없을 것 같다.


이미 많은 블로그에 보다 자세하고 정확한 내용들이 올라와있어
또다시 이 작업을 하는 것은 소모적인 일이 될 것 같아서이다.
게다가 내 짧은 지식으로는 잘못된 정보를 전달할 가능성도 높고…ㅠ.ㅠ


간단하게 참고한 블로그를 소개하자면 다음과 같다.


http://epicdevs.com/17
http://blog.embian.com/category/Apache%20Kafka
http://blog.jdm.kr/208
http://wiki.intellicode.co.kr/doku.php?id=개발:kafka


무작정 달려들기

일단 내가 kafka에 관심을 가지게 된 것은 twitter의 streamin API를
통해 가져오는 데이터를 1차 가공 후 HADOOP이나 HBase에 적재
하기 위해서였다.


어찌어찌 node.js 모듈을 구해서 twitter 샘플 데이터를 가져오는 것은
만들어놓았는데 문제는 적재 전에 가공을 하는 부분이었다.
대략 초당 2~3건 이상 들어오는 데이터를 바로바로 적재하는 것도
쉽지 않을 것 같은데 1차 가공을 해서 적재를 하려고 하니 난감했던
것이다. 그러던 차에 발견한 것이 kafka였고 얼핏 보기에 내 목적을 
충족 시켜줄 수 있을 것으로 판단되었다.


간단하게 개념을 익힌 후 기왕에 twitter 처리가 node.js 모듈로 구현
되었기에 kafka역시 node.js 모듈로 붙여보기로 했다. 
kafka 홈페이지의 클라이언트 메뉴에 있는 node.js 항목에서
kafka-node녀석을 사용하기로 했다. 이유는? 문서가 가장 잘 만들어져
있어서였다. 


(kafka-node : https://github.com/SOHU-Co/kafka-node/)


샘플 코드를 보고 구현을 해보았으나 node.js에 대한 지식도 부족한터라
생각처럼 잘 진행이 되지 않았다.


Node.js에서 막히다…

내가 구상했던 것은 당연히 다음의 진행이다.


  1. twitter 데이터 가져오기 
  2. Producer에서 메시지(1번의 twitter 데이터) 전달 
  3. topic에 적재
  4. Consumer에서 메시지 가공

그런데…
Node.js의 비동기 처리가 날 당황스럽게 했다.


우선은 기존의 twitter streaming API 처리를 하던 js에 kafka-node의
producer 샘플을 그대로 추가했다. producer는 비교적 설정이
단순한 HighLebelProducer를 사용하였다.
최초의 코드는 다음과 같이 구현되었다.


var kafka = require('kafka-node');
var Twitter = require('twitter');
var count = 0;


var client = new Twitter({
  consumer_key: '...',
  consumer_secret: '...',
  access_token_key: '...',
  access_token_secret: '...',
});

/**
*  Stream statuses filtered by keyword
*  number of tweets per second depends on topic popularity
**/
client.stream('statuses/sample', {track: ''},  function(stream){
  stream.on('data', function(tweet) {
      if (tweet.lang == 'kr' || tweet.lang == 'ko') {
        var HighLevelProducer = kafka.HighLevelProducer;
        var Client = kafka.Client;
        var client = new Client('NAMENODE.local:2181,\
            SECONDARY-NAMENODE.local:2181,\
            DATANODE1.local:2181,\
            DATANODE2.local:2181,\
            DATANODE3.local:2181');
        var topic = 'twittopic';
        var producer = new HighLevelProducer(client);
        console.log(tweet);
        console.log('===================================' + count);

        sendMessage(producer, tweet, count);

        count++;
      }
    });

    stream.on('error', function(error) {
    console.log(error);
  });
});

var sendMessage = function(producer, message, count) {
  console.log('==sendMessage=========================');

  producer.on('ready', function () {
    console.log('==producer ready=========');
    setInterval(send, 500);
  });

  function send() {
    console.log('send start');
    producer.send([
        {topic: 'twittopic', messages: [message.id] }
      ], function (err, data) {
      if (err) console.log(err);
      else console.log('send %d messages %s', count, message.id);
    });
  }

  producer.on('error', function (err) {
    console.log('error', err)
  });
};


이렇게 구현을 하는 동안 다음의 문제에 막혔다.


  • producer 객체를 언제 생성해야 하는가?

우선은 당연히 var kafka = require('kafka-node'); 이후에 이어서
관련 객체들을 생성해야 한다고 생각해서 그렇게 진행을 했더니
producer가 전혀 작동을 하지 않았다. 왜인지 아직도 원인은 모르겠다.


이후 twitter 데이터를 가져오는 부분이 
‘stream.on('data', function(tweet)’이고, ’data’이벤트가 발생하면 
콜백함수가 실행되고 이 콜백함수의 파라미터로 twitter의 데이터가 
전달된다. 따라서 나는 데이터가 전달되는 이 위치에서 producer 처리를 
해야 한다고 판단했다. 그래서 현재와 같이 구현을 하였고 일단 동작은
하였다.


  • setInterval의 있고 없음의 차이

보면 알겠지만 sendMessage 함수 내에 구현된 producer역시
‘ready’ 이벤트 발생시 콜백함수가 실행되는 구조이다. 그리고
이 안에 실제 메시지를 보내는 함수를 호출하는 코드가 구현되어있다.


그런데 이 구현에서 setInterval을 사용을 하게 되면 twitter의 콜백이
한 번 호출할 때마다 0.5초 동안 send 함수가 수십차례 호출이 되는
전형적인 비동기 호출의 문제에 빠졌다.


앞서 말했듯이 나는 twitter 메시지 한개가 들어오면 producer에서
그 메시지 1번만 보내는 것을 원했다.


다행히 아주 직관적으로 setIntervar을 제거하니 얼추 동기화가 되었다.


  • 58 제약?

일단 setInterval을 제거하고 호출을 하니 동기화는 되었는데
또다른 문제가 발생을 하였다. 58건의 메시지를 보내고 나면 더이상
메시지의 send가 이루어지지 않는 것이었다. 게다가 consumer도
하나만 메시지를 수신하고 partition 0번만 처리를 하고 있는 것이다.


혹시나 해서 setInterval을 다시 넣어보니 이 경우에도 0.5초동안
전송되는 횟수는 58회였다.


뭔지는 모르겠지만 이정도 수치에서 뭔가 제약이 걸리는 것 같은데
이유를 모르겠다(다음 포스팅에 해결책만 적겠다).


어쨌든 여기까지로 twitter로 메시지를 받고 producer로 topic에
쌓고 consumer로 메시지를 소비하는 것까지의 기본 동작은 확인을
했다. 


다음 포스팅에서는 이 소스를 약간 변형시켜서 애초에 구상했던
내용에 근접하는 동작을 구현한 코드를 가지고 이야기를 풀어나가자. 

저작자 표시
신고



Consumer


일반적으로 메시징 시스템은 queuing 기반 모델과 publish-subscribe 
기반의 모델로 나누어 볼 수 있다.


queuing기반의 메시징 시스템은 sender가 queue에 쌓아 놓은 메시지를 
pool에 있는 receiver중 하나에 각각 할당하는 방식이며 이 과정은 
비동기적으로 이루어진다. (point-to-point)


publish-subscribe기반의 메시징 시스템은 publisher가 메시지를 생성한 후
subscriber에게 broadcasting해준다.


Kafka는 consumer group이라는 개념을 만들어 이 두가지 방식을 종합하고 있다.


토픽으로 발행된 메시지들은 분산된 프로세스나 장비에 있는 consumer 그룹 내의 consumer 중 

오직 하나에게만 전달된다.



이 때, consumer들이 1개의 그룹으로만 묶여있으면 전통적인 queuing 방식으로 작동하고 

( consumer group = consumer pool) 모든 consumer가 서로 다른 그룹에 있으면 

publish-subscribe방식으로 작동하여 메시지들이 전체 consumer에게 broadcasting된다.


아래 그림에서 Consumer Group A가 없다고 생각해보자 Server 1과 Server 2에 있는 각각의 

topic partition들은 Group B 내의 consumer instance들과 1:1로 대응된다. 즉 point-to-point 방식의 

queuing이 되는 것이다.



하지만 그림과 같이 2개의 consumer 그룹이 있으면 각각의 partition들은 각 그룹으로 broadcasting을 

하게 되는 것이다.



이미지 출처1


간단하게 말하자면 이 구조는 구독자가 consumer의 클러스터로 되어있는 경우에는 결국 publish-subscribe 모델과 

다를바가 없다고 볼 수 있다.


Kafka는 전통적인 메시지 시스템보다 강력한 순차적 처리를 보장한다.


전통적인 queue 역시 메시지를 서버에 순차적으로 저장하고 다수의 consumer가 queue로부터 메시지를 사용할 때
저장된 순서에 따라 consumer들에게 메시지를 전달하지만 메시지 전달이 비동기적으로 일어나기 때문에 서로 다른 
consumer에 순서에 맞지 않는 메시지가 도착할 수도 있다.


이러한 동작은 사실상 병렬로 메시지가 사용 됨으로 인해 메시지 정렬의 의미가 퇴색하는 것이다.


종종 이런 상황을 회피하기 위해 ’배타적 consumer’라는 개념을  두어 queue에서 메시지를 읽을 때 단일 프로세스만 

사용하도록 하는 경우도 있는데 이는 당연히 병렬 처리를 못하게 되는 결과를 가져온다.


Kafka는 순차적인 처리를 더 잘 해낼 수 있다. topic 내에 partition이라는 병렬 처리의 개념을 둔 것이다.
이로인해 Kafka는 순처적인 처리에 대한 보장과 pool로 구성된 consumer들의 처리에 대한 load balancing을 

모두 제공할 수 있다.


이전 내용의 그림을 다시 보자


이미지 출처2


이러한 이점은 topic 내의 partision들을 consumer 그룹에 할당을 하고 각각의 partition들은 그룹 내에서 반드시 

하나의 consumer에 의해서만 사용되도록 함으로써 얻을 수 있다. 


이를 통해 어떤 consumer가 유일하게 특정 partition으로부터만 메시지를 소비하고 있으며 이로 인해 partition에 

정렬된 순서에 따라 메시지를 소비하고 있다는 것을 확신할 수 있는 것이다. 게다가 이런 과정이 진행되는 동안 

부하의 균형이 유지될 수 있다. 다만 consumer 그룹 내에 partition보다 많은 consumer들이 있게 되면 노는 

consumer들이 발생하니 주의가 필요하다.


Kafka는 동일 partition 내에서만 순서를 보장한다. 위의 그림에서 보자면 C3의 경우 P0와 1:1로 연결되어있기 때문에
순차적인 처리를 확실하게 보장 받을 수 있지만 C1의 경우 P0와 P3의 메시지를 함께 받고 있으며 이 두 개의 partition을
포괄하는 순차 처리는 보장 받을 수 없게 되는 것이다.



만일 전체적인 범위에서의 메시지 정렬이 필요하다면 topic이 단지 하나의 partition만을 갖도록 하여 처리할 수 있디.
하지만 이럴 경우 consumer 그룹 당 하나의 consumer 프로세스만 있게 되는 것이다.


Guarantee

높은 수준의 Kafka 구현은 다음과 같은 내용을 보장한다.


  • 메시지는 보내진 순서대로 추가된다. 즉 동일한 producer에서 보내진 message M1과 
    message M2가 있고 M1이 먼저 보내졌다면 M1이 M2 보다 낮은 offset을 갖고 로그에도 먼저 나온다.
  • consumer 인스턴스는 로그상에 정렬된 순서대로 메시지를
    인지한다.
  • N개의 복제 팩터를 가진 topic에 대해 메시지 손실 없이 N-1대의
    서버 장애를 허용한다.
  1. 이미지 출처 : Kafka 공식 홈페이지 (http://kafka.apache.org/documentation.html#introduction↩︎
  2. 이미지 출처 : Kafka 공식 홈페이지(http://kafka.apache.org/documentation.html#introduction↩︎


저작자 표시
신고

Distribution

topic의 partition들은 Kafka 클러스터를 구성하는 서버들에 분산 저장이 된다.
partition들은 내고장성을 위해 여러 서버에 복제되며 복제되는 서버의 수를 설정
할 수 있다.


각 partition은 1대의 leader 서버와 0대 이상의 follower 서버들로 구성된다.

(즉, leader 서버 1대로도 Kafka 사용이 가능하다.)

leader는 읽기 쓰기가 모두 가능하고 follower들은 leader의 데이터를 복제한다.


만일 leader가 고장나면 follower 중 한 대를 leader로 선출한다.
이런 구조로 클러스터 내에서의 부하가 적절히 분산된다.


지난 포스팅 (Apache kafka 시작하기)에서의 기억을 더듬어보면
일단 클러스터 내의 복제 서버는 총 5대로 설정되어있다. zookeeper의
server.0 ~ server.4 설정과 Kafka server.properties에서의
zookeeper.connect 설정에 맥미니 5대의 host name이 설정된 것을
확인 할 수 있다.


그리고 이 상태에서 다음과 같이 토픽을 생성할 때 처음에 오류가 발생을 했었다.

SECONDARY-NAMENODE:bin mazdah$ ./kafka-topics.sh --create --zookeeper SECONDARY-NAMENODE.local:2181 --replication-factor 5 --partition 20 --topic test 


원인은 SECONDARY-NAMENODE.local은 leader가 아니었고 그렇기 때문에
복제만 가능하고 write가 불가능했던 것이다. 
결국 leader인 NAMENODE.local을 이용하여 topic 생성에 성공했다.


그리고 이렇게 생성된 topic은 5대의 서버의 로그 디렉토리 
(server.propertiesd에 log.dirs로 설정된 위치 - 나의 경우 /tmp/kafka-logs)에
다음과 같이 뭔가 생성된 것을 확인할 수 있었다.



Producer

Producer는 선택한 topic으로 데이터를 발행한다.
Producer는 메시지를 발행할 topic을 선택하는 책임을 지는데
전통적인 round-robin 방식을 이용하여 균등하게 발행을 하거나
아니면 메시지의 어떤 키를 이용하여 특정 partition에는 특정 키를 가진
메시지만을 보낼 수 있다.

저작자 표시
신고

Apache Kafka 개요


지난 글에서 kafka설치 및 설정, 그리고 서버 기동과 간단한 테스트를
진행해 보았다.


오늘은 kafka의 소개 내용을 간단하게 요약해보겠다.
이미 많은 블로그에 원문에 대한 번역에서부터 심층 분석까지 다양한
자료들이 포스팅 되어있으니 나는 그냥 개요만 짚어보련다~


kafka 공식 홈페이지의 indroduction을 보면 다음과 같은 내용으로 시작한다.

Kafka is a distributed, partitioned, replicated commit log service. It provides the functionality of a messaging system, but with a unique design.


뭐 거의 동어 반복이다. 로그 저장에 대한 분산과 분할과 복제. 그리고 메시징 시스템인데
독특하게 디자인 되었단다. 일단 분산, 분할, 복제에 돼지꼬리 땡땡 하고...


조금 더 구체적으로 가보자

  • 메시지는 topic이라고 불리는 일종의 카테고리를 통해 관리된다.
  • producer를 통해 Kafka의 topic으로 메시지를 발행한다.
  • consumer를 통해 topic으로부터 생성된 메시지들을 구독한다.
  • Kafka는 broker라고 불리는 하나 이상의 서버 클러스터로 실행된다.

Kafka 구성1

지난 번 포스팅의 테스트 과정과 비교하면서 살펴보자
  • “kafka-topics.sh --create…”를 통해서 test라는 이름의 topic을 생성.
  • “kafka-console-producer.sh --broker-list…”를 통해 메시지를 생성.
  • “kafka-console-consumer.sh --zookeeper…”를 통해 메시지를 확인.

broker에 대해서는 2번째 kafka-console-producer.sh 진행 시 --broker-list라는
옵션을 통해 메시지를 broker로 전달했다는 것을 알 수 있다.


이런 과정들 (클라이언트와 서버간의 통신)는 단순하고, 고성능이며 언어에 구애받지 않는
TCP 프로토콜을 통해 이루어진다.


또한 Kafka에서는 기본적으로 java 클라이언트를 제공하지만 다양한 언어들을 이용하여
클라이언트를 구현할 수 있다. (다음 링크에서 사용 가능한 클라이언트 언어를 확인할 수
있다 : https://cwiki.apache.org/confluence/display/KAFKA/Clients )


각각의 개념에 대해 좀 더 상세하게 알아보자


Topic과 로그

토픽은 Kafka 클러스터에 의해 partition으로 분할되어 관리된다. 
아래 그림과 같음)


토픽 개념도2


각 파티션에는 메시지들이 순차적으로 쌓인다.
이 메시지들은 정렬되어 저장되며 각각을 구분할 수 있는 offset이라는 ID가 
부여된다.


이렇게 쌓인 메시지들은 사용되었는지 여부와 상관 없이 일정 시간동안 유지된다.
이런 기능들은 데이터 크기를 효율적으로 사용할 수 있게 해준다.

유지 시간은 Kafka설정에이 있으며 기본값은 168(7일)시간이다
설정 항목은 log.retention.hours이다.


각각의 consumer에도 offset이라고 하는 메타데이터가 있어 메시지의
offset과 관련하여 작동한다.


partition 분할을 통해 데이터 용량의 확장과 병렬 처리의 잇점을 얻을 수 있다.

  1. 이미지 출처 : http://kafka.apache.org/documentation.html#introduction ↩︎
  2. 이미지 출처 : http://kafka.apache.org/documentation.html#introduction ↩︎


저작자 표시
신고



Apache Kafka 시작하기


참조 사이트

http://kafka.apache.org
http://epicdevs.com/20

다행이 예전에 Hbase를 설치할 때 zookeeper를 설치해놓은 덕에 kafka를 설치하고 구동하는 과정은 

그리 어렵지 않았다.아다시피 요즘 OS가 Windows만 아니면 binary 패키지를 다운로드 받고 적절한 위치에 

압축 풀고 하면반은 된 것이나 다름 없다.


kafka의 경우 $KAFKA_HOME/config/server.properties에 몇가지 설정만 한 후 기동시키면 된다.
다음은 내 PC 환경에서의 설정 및 구동 과정이다.


PC 환경

현재 맥미니 5대를 내부망으로 연결시킨 상태로 각각의 PC에 대한 정보는
/etc/hosts 파일에 설정이 되어있다. 대략 다음과 같다.

172.30.1.33     NAMENODE.local
172.30.1.56     SECONDARY-NAMENODE.local
172.30.1.59     DATANODE1.local
172.30.1.39     DATANODE2.local
172.30.1.11     DATANODE3.local

때문에 대부분의 설정에서는 host name인 NAMENODE.local 등을 이용한다. 이후
설정 사항에 이점 참고하길 바란다.


zookeeper 설정

zookeeper는 5대의 노드 모두 설치가 되어있으며 대략 다음과 같이 
설정이 되어있다. 설정 파일은 $ZOOKEEPER_HOME/cont/zoo.cfg이다. 
물론 5대의 노드 모두 동일한 설정이다.

dataDir=/zookeeper
clientPort=2181

#Zookeeper Servers
server.0=NAMENODE.local:2888:3888
server.1=SECONDARY-NAMENODE.local:2888:3888
server.2=DATANODE1.local:2888:3888
server.3=DATANODE2.local:2888:3888
server.4=DATANODE3.local:2888:3888

zookeeper의 실행 명령은 다음과 같다. 
(zookeeper 설치 위치는 zookeeper-3.4.5다

NAMENODE1:bin mazdah$ /zookeeper-3.4.5/bin/zkServer.sh start

kafka 설치, 설정, 구동

  • 설정

앞서 말했듯이 다운로드 압축해제 적절한 위치 복사면 모든 설치는 끝이다. 내가 받은 버전은 

kafka_2.11-0.9.0.1이며 /kafka에 설치했다.


우선 설정파일을 열고 다음과 같이 설정을 하였다. 설정 파일은 /kafka/config/server.properties다.
가장 먼저 설정해주어야 할 부분은 broker.id이다. 

나는 총 5대의 노드가 있기 때문에 각각의 PC(node)에 0부터 4까지 아이디를 부여해주었다.
예를들어 NAMENODE.local의 설정은 다음과 같다.

broker.id=0

다음으로는 zookeeper에 대한 설정으로 zookeeper.connect라는 항목에 
앞서말한 5개의 노드를 모두 적어주었다.

zookeeper.connect=NAMENODE.local:2181,SECONDARY-NAMENODE.local:2181,DATANODE1.local:2181,DATANODE2.local:2181,DATANODE3.local:2181

나는 처음에 이 부분에 적는 2181 포트가 도대체 어떤 포트인가 고민하다가 나중에 zookeeper의 cleintPort 

설정을 보고는 이해했다…-.- 그밖의 나머지 설정은 그냥 default로 두었다.


나중에 추가한 설정이 있는데 이 것은 나중에 상황을 설명하면서 함께 이야기 하겠다.

  • kafka 서버 기동

이제 kafka 서버를 기동해보자. 각각의 노드에서 /kafka/bin으로 이동한 후 다음과 같이 입력하여 기동한다.

NAMENODE:bin mazdah$ ./kafka-server-start.sh ../config/server.properties

아래와 같이 kafka 설정 정보들이 죽 나오고 이후 서버 start 메시지가 출력되면서 기동된다.

[2016-03-20 11:38:57,142] INFO KafkaConfig values: 
advertised.host.name = null
metric.reporters = []
quota.producer.default = 9223372036854775807
offsets.topic.num.partitions = 50
.
.
.

이후 테스트는 http://epicdevs.com/20의 내용을 그대로 진행해보았다.
물론 서버 정보 등은 나의 환경에 맞춰서..

  • topic 생성

SECONDARY-NAMENODE:bin mazdah$ ./kafka-topics.sh --create --zookeeper SECONDARY-NAMENODE.local:2181 --replication-factor 5 --partition 20 --topic test

그런데 처음부터 문제가 생겼다. 위와같이 시작을 했는데 아래와 같은 오류가 발생을 했다.

Exception in thread "main" org.I0Itec.zkclient.exception.ZkTimeoutException: Unable to connect to zookeeper server within timeout: 30000
at org.I0Itec.zkclient.ZkClient.connect(ZkClient.java:1223)
at org.I0Itec.zkclient.ZkClient.<init>(ZkClient.java:155)
at org.I0Itec.zkclient.ZkClient.<init>(ZkClient.java:129)
at kafka.utils.ZkUtils$.createZkClientAndConnection(ZkUtils.scala:89)
at kafka.utils.ZkUtils$.apply(ZkUtils.scala:71)
at kafka.admin.TopicCommand$.main(TopicCommand.scala:53)
at kafka.admin.TopicCommand.main(TopicCommand.scala)

ps 명령어로 각 노드에서 확인을 해보아도 모든 zookeeper 인스턴스들이 정상적으로 떠있는 상태였는데 

zookeeper 연결시 타임아웃으로 연결할 수 없다니…ㅠ.ㅠ


일단 퍼뜩 떠오르는 생각은 zookeeper의 leader가 NAMENODE.local에 설정되어 있다는 것이다. 

아마도 topic 생성 등의 중요 작업은 zookeeper leader 노드에서 해야 하는 것 같다. 

다음과 같이 NAMENODE.local에서 topic이 생성되도록 하니 정상적으로 topic이 생성되었다.
(실제로 zookeeper의 leader node에서 실행하지 않아도 —zookeeper 옵션만leader node로 해주어도 된다). 
다음과 같이 성공하였다.

SECONDARY-NAMENODE:bin mazdah$ ./kafka-topics.sh --create --zookeeper NAMENODE.local:2181 --replication-factor 5 --partition 20 --topic test

Created topic "test".
  • topic 리스트 보기

자 그러면 이제 topic 리스트를 한 번 보도록 하자.

SECONDARY-NAMENODE:bin mazdah$ ./kafka-topics.sh --list --zookeeper NAMENODE.local:2181

test

아직은 만들어진 topic이 test 하나밖에 없기 때문에 test만 출력된다.

  • topic 상세 보기

그리고 다음과 같이 test topic의 상세 내용을 확인해보자.

SECONDARY-NAMENODE:bin mazdah$ ./kafka-topics.sh --describe --zookeeper NAMENODE.local:2181 --topic test

Topic:test	PartitionCount:20	ReplicationFactor:5	Configs:
Topic: test	Partition: 0	Leader: 0	Replicas: 0,1,2,3,4	Isr: 0,1,2,3,4
Topic: test	Partition: 1	Leader: 1	Replicas: 1,2,3,4,0	Isr: 1,2,3,4,0
Topic: test	Partition: 2	Leader: 2	Replicas: 2,3,4,0,1	Isr: 2,3,4,0,1
Topic: test	Partition: 3	Leader: 3	Replicas: 3,4,0,1,2	Isr: 3,4,0,1,2
Topic: test	Partition: 4	Leader: 4	Replicas: 4,0,1,2,3	Isr: 4,0,1,2,3
Topic: test	Partition: 5	Leader: 0	Replicas: 0,2,3,4,1	Isr: 0,2,3,4,1
Topic: test	Partition: 6	Leader: 1	Replicas: 1,3,4,0,2	Isr: 1,3,4,0,2
Topic: test	Partition: 7	Leader: 2	Replicas: 2,4,0,1,3	Isr: 2,4,0,1,3
Topic: test	Partition: 8	Leader: 3	Replicas: 3,0,1,2,4	Isr: 3,0,1,2,4
Topic: test	Partition: 9	Leader: 4	Replicas: 4,1,2,3,0	Isr: 4,1,2,3,0
Topic: test	Partition: 10	Leader: 0	Replicas: 0,3,4,1,2	Isr: 0,3,4,1,2
Topic: test	Partition: 11	Leader: 1	Replicas: 1,4,0,2,3	Isr: 1,4,0,2,3
Topic: test	Partition: 12	Leader: 2	Replicas: 2,0,1,3,4	Isr: 2,0,1,3,4
Topic: test	Partition: 13	Leader: 3	Replicas: 3,1,2,4,0	Isr: 3,1,2,4,0
Topic: test	Partition: 14	Leader: 4	Replicas: 4,2,3,0,1	Isr: 4,2,3,0,1
Topic: test	Partition: 15	Leader: 0	Replicas: 0,4,1,2,3	Isr: 0,4,1,2,3
Topic: test	Partition: 16	Leader: 1	Replicas: 1,0,2,3,4	Isr: 1,0,2,3,4
Topic: test	Partition: 17	Leader: 2	Replicas: 2,1,3,4,0	Isr: 2,1,3,4,0
Topic: test	Partition: 18	Leader: 3	Replicas: 3,2,4,0,1	Isr: 3,2,4,0,1
Topic: test	Partition: 19	Leader: 4	Replicas: 4,3,0,1,2	Isr: 4,3,0,1,2

topic 생성시 partition을 20으로 지정했기 때문에 Partition: 0부터  Partition: 19까지 20개의 목록이 보이게 된다. 

각 항목에 대해서는 추후에 설명하기로 하고 대략 이런 모습이란 것만 확인하고 넘어가자.

  • producer를 이용한 메시지 생성

다음과 같이 입력하여 message룰 생성해보자.
이 명령에서 보여지는 9092포트는 kafka 설정 파일인 server.properties에 “listeners=PLAINTEXT://:9092”로 

설정이 되어있다. 실행하면 빈 프롬프트가 생기고 적절한 메시지를 입력하면 된다. 


입력한 후 엔터를 치면 다음 메시지를 입력할 수 있다. message1에서 message6까지 메시지를 생성해보았다.

SECONDARY-NAMENODE:bin mazdah$ ./kafka-console-producer.sh --broker-list NAMENODE.local:9092 --topic test

message1
message2
message3
message4
message5
message6
  • cunsumer를 이용한 message 확인

다음과 같이 consumer를 실행하여 producer에서 생성한 메시지를 확인한다. 입력한 순서대로 출력되지 않는 것이 

아마도 Map 형식의 처리인 것으로 보인다. (가운데 빈 라인은 message 입력 제일 마지막에 엔터를 한 번 입력해서 

그렇다)

SECONDARY-NAMENODE:bin mazdah$ ./kafka-console-consumer.sh --zookeeper NAMENODE.local:2181 --topic test -rom-beginning

message5
message3

message1
message2
message4
message6
  • topic의 삭제

그냥 테스트로 만든 topic이니 미련 없이 삭제해보자. test이후에 test2라는 topic을 하나 더 만든 상태이다.
topic의 삭제는 생성과 동일한 구문으로 수행을 하며 —create 대신 —delete를 사용하면 된다.

SECONDARY-NAMENODE:bin mazdah$ ./kafka-topics.sh --delete --zookeeper NAMENODE.local:2181 --replication-factor 5 --partition 20 --topic test2

Topic test2 is marked for deletion.
Note: This will have no impact if delete.topic.enable is not set to true.

그런데 여기서도 이상한 부분이 발견되었다. 마지막 줄에 보면 다음과 같은 내용이 있다.
Note: This will have no impact if delete.topic.enable is not set to true.

혹시나 해서 topic 리스트를 확인해보았다.

SECONDARY-NAMENODE:bin mazdah$ ./kafka-topics.sh --list --zookeeper NAMENODE.local:2181

test
test2 - marked for deletion

역시나 삭제한 test2 토픽에 marked for deletion라는 내용이 붙어있다. 어찌된 일일까? 

짐작했겠지만 답은 삭제시 보여진 NOTE의 내용에 있다. 설정 파일에 delete.topic.enable항목을 추가하고 

값을 true로 설정하면 된다.

delete.topic.enable=true

이후 다시 목록을 확인해보면 다음과 같이 말끔히 지워진 것을 확인할 수 있다.

SECONDARY-NAMENODE:bin mazdah$ ./kafka-topics.sh --list --zookeeper NAMENODE.local:2181

test

일단 오늘은 여기까지 기본적인 내용을 확인할 수 있었다.
다음 주에는 본격적으로 kafka에 대해 하나 하나 정리를 해보아야겠다.

저작자 표시
신고


이제야 발견한 Kafka


하던 일도 제대로 못하면서 빅데이터 공부해보겠다고 꼴깝을 떤 것이 
벌써 2013년 5월달 이야기네…


뭔가 새로운 것에 대해서는 남들 앞에서 한 마디나마 
거들 수 있어야 하지 않을까 하는 밑도 끝도 없는 초조감이
나를 뻘짓거리의 함정으로 이끌었다.


뭔가를 시작하기 전에는 지름신을 영접하는 것이 당연한(?) 의례인지라
이 때도 지름신을 조금 과하게(…ㅠ.ㅠ) 영접했다.


맥미니 5대…


그리고는 한 동안은 신났다.
Hadoop 설치하고 zookeeper 설치하고 Hbase 설치하고…
그리고…샘플 한 번 돌려보고? 끝이었나?…ㅠ.ㅠ


목표로 삼았던 것이 twitter의 데이터를 수집해서 이것 저것 분석하는
공부를 좀 해보고자 했는데…
이게 당최 감이 안잡히는 것이다.


twitter API를 통해 데이터를 가져오는 것이야 알겠는데…
이걸 어떻게 저장을 해야하는지…


처음에는 그냥 REST API를 이용해서 데이터를 가져오고
무작정 일반 txt 파일로 저장을 했다.
그러다가 Streaming API를 이용하게 되었는데 아무리 샘플 데이터라
하더라도 연속적으로 들어오는 데이터를 어떻게 저장을 해야 할지,
그리고 간단하게나마 가공을 하고자 하는데 가공을 하는 중 계속해서
들어오는 데이터는 어떻게 처리를 해야 하는지에 대한 해결책을 
전혀 몰랐다.



그렇게 3년이 지나버린 것이다.
그리고 우연히 kafka를 발견한 것이다.
프란츠 카프카도 아니고 해변의 카프카도 아니고 apache Kafka!


일단 2가지 측면에서 관심을 가지게 되었는데 하나는 Use Case 중 
Stream Processing이고 다른 하나는 Client에 node.js가 있다는 점이었다.


처음부터 twitter API를 이용하는데 node module을 사용한지라 계속해서
node module을 사용할 수있다는 것은 나에게는 큰 매리트였다.



아직은 대충 훑어본 정도라 과연 나의 목적에 딱 맞는지는 잘 모르겠으나
이 부분은 계속 적용을 해 나가면서 구체적으로 정리를 해야겠다.


당분간 주말에 할 것이 생겨서 즐겁네

저작자 표시
신고

하둡 설치 후 꽤 오랜 시간이 흘렀다.

그간 회사 업무가 바쁜 것도 있었지만 엄청나게 간단하다면 간단한 zookeeper와 HBase의 설치와 설정에서

생각 외로 많은 시간을 잡아먹었다.


그사이 Streaming API를 통해 축적한 트위터 데이터는 53Gb에 이르고 있다.

얼른 HBase를 설치하여 담아야 하는데…


사실 zookeeper와 HBase의 경우 너무서 설치와 설정이 간단해서 오히려 많은 자료들을 참조한 것이 

더 혼란을 가져왔다. 디테일한 차이가 얼마나 영향을 끼치는지 모르는 상황에서는 이것저것 다 해볼 수밖에

없기에 시간도 그만큼 많이 걸리고…


암튼 시행착오의 역사를 적는다…-.-


1. zookeeper를 설치하지 않다!


우선 HBase 완벽가이드를 참조해서 설치를 시작했는데…이 책이 완벽하지 않은 것인지 내가 띄엄띄엄 읽은 것인지

이 책의 설치 관련 챕터에는 zookeeper 설치에 대한 이야기가 없다.


띄엄띄엄 읽은(이편이 신빙성 있겠지…-.-) 나는 덜렁 HBase만을 설치한 후 설정과 실행단계에까지 진행을 했다.

물론 정상 실행될리가 없다.


jps를 이용해 실행되고 있는 프로세스들을 확인해본 결과 정상 상태라면 QuorumPeerMain이 떠있어야 하는데

나는 MasterQuorumPeer던가 하는 이상한 놈이 떠있었고 나는 그저 속편하게 내 주키퍼의 버전이 최신이어서

그런가보다 하고 엉뚱한 곳에서 원인을 찾고 있었다.


2. zookeeper를 설치하였으나 zookeeper가 실행이 안된다!


결국 우여곡절 끝에 zookeeper를 별도로 설치해야 한다는 것을 알았다.

그래서 설치를 하고 zoo.cfg도 잘 설정을하고 실행을 시켰으나…

역시나 실행이 안된다.


로그를 봐도 뭔가 접속 거부를 당했다거나, 타임아웃에 걸렸다거나…

어쨌든 뭔가 노드간에 통신이 안되고 있다는 짐작을 할 수는 있는 내용들이었으나

정확한 원인을 찾기가 쉽지 않았다. 언제나 그렇듯이 '간단한' 설정임에도 안되는 것이 생기면

무척 당황하기 마련이다.


그리고 이럴 땐…

그저 처음부터 차근차근 다시 해보는 것이 상책이다.


결국 밝혀낸 원인은 hbase.zookeeper.property.dataDir에 설정해놓은 주키퍼의 데이터 디렉토리에

생성되는 myID 파일의 값과 zoo.cfg에 설정한 주키퍼 서버 정보가 맞지 않아서 발생한 문제들이었다.


아무 생각 없이 실행시킨 주키퍼는 master 노드의 myID값을 0으로 설정하였다.

그런데 인터넷에 있는 대다수 설치 방법 안내에 보면 zoo.cfg에 다음과 같이 설정하고 있다.


#Zookeeper Servers
server.1=NAMENODE.local:2888:3888
server.2=SECONDARY-NAMENODE.local:2888:3888
server.3=DATANODE1.local:2888:3888

server.4=DATANODE2.local:2888:3888 

server.5=DATANODE3.local:2888:3888 


그리고 server.x의 x 값이 myID 파일의 값과 일치해야 한다고 설명하고 있다.

그래서 master 노드의 myID 값을 1로 바꾸고 나머지 노드들에 대해서도 순서대로 myID 파일의 값을

변경을 해주었다.


하지만 변한게 없었다.

또 몇칠을 허비한 끝에 우연히 master 노드의 myID 값은 내가 정해준 그대로 1로 되어있는데

슬레이브 노드들에서 myID 값들이 master가 0인것으로 인식되어 1, 2, 3, 4의 값들이 들어가있는 것이었다.


결국 시스템의 고집에 굴복하여 xoo.cfg의 설정을 다음과 같이 바꾸고 master의 myID도 0으로 바꾸었다.


#Zookeeper Servers
server.0=NAMENODE.local:2888:3888
server.1=SECONDARY-NAMENODE.local:2888:3888
server.2=DATANODE1.local:2888:3888

server.3=DATANODE2.local:2888:3888 

server.4=DATANODE3.local:2888:3888


그리고 드디어 zookeeper가 정상적으로 로드되었다.

zkCli.sh를 통한 테스트도 정상적으로 이루어졌다.



 3. 마지막 관문! HBase!


사실 HBase는 주키퍼 설치 이전에 상당부분 설정을 해 놓았기에 별 생각없이 실행을 시켜보았는데

역시나 정상적으로 실행이 안된다.


몇번의 시도 끝에 주키퍼가 제대로 로드되지 않아서 그런 경우를 몇번 확인하고

마지막으로 주키퍼가 정상적으로 로드된 상태에서 마지막 시도를 했는데 드디어 HBase도 정상적으로 올라왔다.

그런데 60010포트의 웹 관리 화면도 뜨지를 않고 또 hbase shell로 들어가 status 명령을 내려보니 다음과 같은

오류 메시지가 뜬다.


ERROR: org.apache.hadoop.hbase.masternotrunningexception retried 7 times


요건 뭘까…하고 또 한참을 고민하다가 hbase의 로그를 보니 다음 내용이 자꾸 올라간다.


Wating for dfs to exit safe mode…


역시 짐작대로 hadoop이 safe mode로 들어가있어서 발생한 문제였다.


hadoop dfsadmin -safemode leave로 safe mode를 빠져나가자…

드디어 HBase가 정상 작동하기 시작했다.


4. 로그 보기의 어려움


HBase와 zookeeper를 설치하다보니 로그를 많이 참조해야 했는데 어쩐 이유에서인지 ERROR 처리에

매우 인색하다는 느낌을 받았다. 대부분의 로그가 내용상은 Exception이 출력되더라도 level은 INFO나

WARN 수준이 대부분이고 가뭄에 콩나듯 ERROR가 보였다.


그러다보니 분명 Exception인데…이걸 해결하고 가야 하는 것인지 그냥 무시하고 넘어가도 되는 것인지

판단이 쉽지 않았다.


아마도 네트워크를 통해 멀티 노드를 관리하는 시스템이다보니 일시적으로 통신 장애등이 발생했다고 해서

그것을 error로 처리하기에는 무리가 있어서 그런 것이 아닐까 추측해본다.


그리고 그에 걸맞게 대부분의 문제는 노드간의 통신이 안되는 것, 특히 특정 노드에 프로세스가 로드되지 않아

발생하는 문제가 거의 대부분이었다는 것이다.


특히나 jps나 ps를 통해 프로세스를 확인하는 경우에도 분명 해당 프로세스가 보이는데 실제로는

서버가 구동되지 않는 상태인 경우가 많아 더욱 판단을 어렵게 하였다.


이런 부분만 조심한다면 사실 설치와 설정, 실행은 어렵지 않는 시스템인 것은 분명하다.


5. 다음 단계로


맥미니 5대 모으고 처음 Hadoop을 설치하여 구동시켜보고는 좋아라 하던 때가 엊그제 같은데…

벌써 해를 넘기려고 하고있다.


앞서 언급했듯이 샘플 데이터로 모으고있는 트위터 데이터도 53Gb 정도 모였고(물론 너무나 부족한 양이지만)

이제는 Map/Reduce와 HBase관련 실전 프로그래밍에 들어가야 할 시점에 이르렀다.


두 권의 완벽 가이드(Hadoop 완벽 가이드, HBase 완벽 가이드)를 통해 마지막 단계로 진입을 해야 할

시점이 된 것이다.


또 얼마나 헤매게 될지 걱정이 앞서지만 부딪쳐봐야 아는 것이고

현재 개발 중인 아이폰 앱 개발과 중복해서 하려면 시간이 만만치 않게 들게 될 것이

걱정이라면 걱정이다.

저작자 표시
신고
  1. Jung 2013.12.20 01:37 신고

    일전에 네이버에서 쪽지로 질문드렸던 석사생입니다^^
    맥미니를 2대 더 구하게 되어 총 3대로 설정하려합니다.
    마즈다님이 쓰신 빅데이터 관련 글을 보며 많은 도움 얻고 있습니다^^

  2. 마즈다 2013.12.30 18:59 신고

    그냥 마구잡이로 쓰는 글들이 도움이 되신다니 한편 감사하고 한편 걱정되기도 하네요.
    앞으로 좀더 정확한 자료에 바탕을 두고 글을 쓰도록 노력하겠습니다.
    감사합니다…^^

  3. LDW 2014.03.31 17:39 신고

    안녕하세요 . 저도 Hadoop+Zookeeper+Hbase 환경 구성을 하고 있는데요..
    음 .. 모든 프로세서는 정상적으로 올라갑니다 .. 근데 Hbase shell 실행 시 status를 하면 server가 0이 나오는데요.
    아마도 HRegionServer가 HMaster와의 연결이 안되는 것 같은데 구글링해서 할 수 있는 모든 건 다 해봤습니다.
    참고로 Ubunu라 /etc/hosts 설정도 바꿔보았고....뭐 아무튼 머리가 너무 아프네요 ㅠ
    혹시 이 문제에 대해서 알고 계신게 있는지 궁금합니다..

    • 마즈다 2014.05.07 18:22 신고

      한동안 회사 업무에 치어 블로그를 관리하지 못했더니 글이 올라와 있었네요...이 글을 보실지 모르겠지만 답변이 늦어 죄송합니다.

      그렇다고 문제 해결을 위한 답변을 드리긴 어려울 것 같구요.
      우선 중요한 것은 역시나 로그를 잘 살피는 일일 것 같습니다.
      하둡, HBase, Zookeeper까디...찾아볼 로그들이 좀 많죠...
      저도 한~두달 지나고 나니 기억이 가물가물 하네요...
      제대로 된 답변을 못드려 죄송합니다.

+ Recent posts

티스토리 툴바