Cluster : The Beginning - Apache Kafka와 EMQ 연동


HAProxy, EMQ, Kafka 설치를 통해 일단 입구는 마련이 되었다…고 생각했으나 아직 한 가지 남은 것이 있었다.
단순한 메신저 기능이나 M2M 통신만을 위한 것이 아니라 데이터 분석이 목적이라면 EMQ를 통해 들어온 데이터를 
저장해야 할텐데 아직 그 기능이 구현되지 않았다.


EMQ의 토픽을 영구 저장소에 저장하는 다른 방법이 있는지는 모르겠으나 나의 경우 일단 Kafka가 설치되어있고
곧 Hadoop과 HBase를 설치할 예정이기에(2018년 1월 2일 현재 설치 완료됨) 다음과 같은 경로로 저장을 하기로
했다.




그리 어렵지 않은 내용이니 간단하게 소개하고 마치도록 하겠다.


1차시기 실패 : emqttd_plugin_kafka_bridge


우선 처음 검색해서 찾아낸 것이 emqttd_plugin_kafka_bridge란 놈이었다. 이름으로 알 수 있듯이 EMQ와의
연동을 고려하여 만들어진 것 같은데…어쩐 일인지 빌드가 되지 않았다. 우선은 라즈베리파이라서 그런가보다 하고
넘어 가고 다른 대안을 찾기 시작했다. mqttKafkaBridge와 같은 몇몇 도구들도 찾아보았으나 라즈베리파이에서는
잘 실행이 안되었다 (아래 링크는 시도를 해보았거나 검색 결과로 찾은 도구들이다). 



결국 아래 링크된 페이지를 참조하여 kafka에 있는 connect-standalone.sh를 이용하여 연동하는데 성공하였다.


https://howtoprogram.xyz/2016/07/30/apache-kafka-connect-mqtt-source-tutorial/


connect-standalone.sh 이용하기


우선 나의 경우 위에 링크한 페이지를 참고로 하여 mqtt 연동을 위한 설정은 mqtt.properties 파일에 
아래와 같이 하였다. 이 파일은 $KAFKA_HMOE/config에 위치해야 한다.

# 이 커넥터의 이름과 커넥터로 사용할 클래스 그리고 task의 최댓값을 설정한다.
name=mqtt
connector.class=com.evokly.kafka.connect.mqtt.MqttSourceConnector
tasks.max=1

# kafka쪽에서 사용할 토픽 이름이다.
kafka.topic=mqtt-kafka

# mqtt에 연결할 kafka broker의 아이디이다. 나머지 2대의 kafka broker에는 각각
# mqtt-kafka-2, mqtt-kafka-3으로 설정이 되어있다.
mqtt.client_id=mqtt-kafka-1

# 연결 관련 설정으로 위에 링크한 참조 페이지 내용을 그대로 사용하였다.
mqtt.clean_session=true
mqtt.connection_timeout=30
mqtt.keep_alive_interval=60

# MQTT 관련 설정. mqtt.server_uris는 나의 경우 EMQ를 노드 2개의 클러스터로 구성하였고
# 부하 분산을 위해 HAProxy가 설치된 서버 주소로 설정하였다. 그리고 EMQ에서 사용할 토픽은
# /mqtt로 설정하였다.
mqtt.server_uris=tcp://172.30.1.23:1884
mqtt.topic=/mqtt


이와 같이 설정한 후 connect-standalone.sh를 다음과 같이 실행한다(물론 이미 zookeeper와 kafka는 
실행이 된 상태여야 한다).

$ cd $KAFKA_HOME/bin
$ ./connect-standalone.sh config/connect-standalone.properties config/mqtt.properties


connect-distribute.sh도 있는데 standalone과 어떤 차이가 있는지 잘 모르겠다. 추후 다시 확인을 해봐야겠다.


연동 결과


connect-standalone.sh를 실행하고 나면 잠시 후 EMQ의 웹 화면에 아래와 같이 mqtt.properties에 설정한
mqtt.client_id 값들이 메뉴 곳곳에 보이는 것을 확인할 수 있다.


EMQ에 연결된 클라이언트로 mqtt.properties에 설정한 클라이언트 Id가 보인다.

현재 연결된 세션에도 역시 클라이언트 Id가 보인다.


마찬가지로 구독자 목록에도 클라이언트 Id가 보이며 구독하는 토픽은 mqtt.topic에 설정해준 값인 /mqtt
보인다.


마지막으로 Kafka Manager에서 Topic List메뉴를 보면 mqtt.properties의 kafka.topic 항목에 설정한
mqtt-kafka가 보이는 것을 확인할 수 있다.




정리


앞서 정리한 내용과 마찬가지로 세세한 부분까지 확인하지는 못했지만 우선 Kafka와 EMQ를 연동하는 것 까지
작업을 완료 하였다. 이제 EMQ -> Kafka -> Hadoop(HBase)로 이어지는 프로세스에는 아두이노로 만들
온습도계의 데이터를 저장할 것이고 Kafka Producer -> Kafka -> Hadoop(HBase)로 이어지는 프로세스에는
일반 텍스트 데이터를 저장하게 될 것이다.


이렇게 해서 데이터 컬렉션을 담당할 부분은 모두 설치가 완료가 되었다.
다음 시간부터는 데이터 저장 및 분석을 위한 Hadoop과 HBase 설치에 대한 내용을 정리하고 그 이후 데이터의 
실시간 분석 및 머신 러닝을 위한 Apache Spark 설치에 대한 내용으로 마무리를 하도록 하겠다.


어느덧 해가 바뀌었고 다행히 설치 자체는 순조롭게 진행이 되고 있다.
부디 늦지 않게 데이터가 수집되고 수집한 데이터를 분석하는 것 까지 정리 할 수 있기를 바란다.


이 글을 보고 계시는 분들께 새해 인사 전하면서 이만 줄인다.


무술년 한 해 모두 행복하시기를 바랍니다.
새해 복 많이 받으세요!!! 






  1. 2018.01.11 01:02

    비밀댓글입니다

    • 2018.01.11 08:47

      비밀댓글입니다



소스 정리와 모니터링 툴


소스 정리

일단 급하게 기능을 확인하다보니 소스 코드가 엉망이다.
조금이나마 다듬어야 보기가 편할 것 같아 쉬어갈 겸 
우선 node 소스들을 정리했다.


tmgetter.js 
(트위터 메시지를 받아서 콜백 함수를 통해 topic으로 메시지를
보내는 모듈)

var Twitter = require('twitter');
var count = 0;


var client = new Twitter({
  consumer_key: '...',
  consumer_secret: '...',
  access_token_key: '...',
  access_token_secret: '...',
});

var msgArr = new Array();

/**
*  parameter
* msgCount : kafka 토픽으로 보낼 메시지 배열의 수
* sendMessage : kafka 토픽으로 메시지를 보낼 콜백 함
**/
var getTwitterMessage = function(kafka, msgCount, sendMessage) {
   /**
   * Stream statuses filtered by keyword
   * number of tweets per second depends on topic popularity
   **/
   client.stream('statuses/sample', {track: ''},  function(stream){
      stream.on('data', function(tweet) {
        if (tweet.lang == 'kr' || tweet.lang == 'ko') {
          if (count % msgCount == 0) {
            // 메시지 배열에 값이 들어있는 경우 초기화 시
            if (msgArr.length > 0) {
              while(msgArr.length > 0) {
                msgArr.pop();
              }
            }
          }

          var messageStr = extractMessage(tweet);

          msgArr.push(messageStr);

          if ((count % msgCount) == (msgCount - 1)) {        
            if (typeof sendMessage === 'function') {
              var Producer = kafka.Producer;
              var Client = kafka.Client;
              var client = new Client('NAMENODE.local:2181,SECONDARY-NAMENODE.local:2181,DATANODE1.local:2181,DATANODE2.local:2181,DATANODE3.local:2181');

              var topic = 'twittopic';
              var producer = new Producer(client, { requireAcks: 1 });

              sendMessage(producer, msgArr, count);
            }
          }
          count++;
        }
      });

      stream.on('error', function(error) {
        console.log(error);
      });
    });

}

// 트위터 메시지에서 필요한 부분만 추출하는 함수
var extractMessage = function(tweet) {
  var messageObj = {};

  messageObj.createdAt = tweet.created_at;
  messageObj.id = tweet.id;
  messageObj.idStr = tweet.id_str;
  messageObj.text = tweet.text;
  messageObj.UserScreenName = tweet.user.screen_name;
  
  messageObj.retweetCreatedAt = '';
  messageObj.retweetId = '';
  messageObj.retweetIdStr = '';
  messageObj.retweetText = '';
  messageObj.retweetUserScreenName = '';
  messageObj.retweetCount = '';
  
  if (tweet.retweeted_status) {
    messageObj.retweetCreatedAt = tweet.retweeted_status.created_at;
    messageObj.retweetId = tweet.retweeted_status.id;
    messageObj.retweetIdStr = tweet.retweeted_status.id_str;
    messageObj.retweetText = tweet.retweeted_status.text;
    messageObj.retweetUserScreenName = tweet.retweeted_status.user.screen_name;
    messageObj.retweetCount = tweet.retweeted_status.retweet_count;
  }
  
  return JSON.stringify(messageObj)
}

exports.getTwitterMessage = getTwitterMessage;

sender.js
(트위터 메시지를 topic에 전달할 모듈)

var sendMessage = function(producer, msgArr, count) {

  console.log('==start sendMessage=================================================================');

  producer.on('ready', function () {
    console.log('-> producer ready');

    var p = count % 3;
    producer.send([
        {topic: 'twittopic', partition: p, messages: msgArr, attributes: 0 }
      ], function (err, data) {
          if (err) {
            console.log('-> Error : ' + err);
          }
          else {
            console.log('-> send %d messages', count);
            console.log('-> send to %d Partition', p);
          }

          producer.close(function() {
            console.log('-> close produce');
            console.log('==end sendMessage===================================================================');
          });
    });
  });

  producer.on('error', function (err) {
    console.log('error', err)
  });
};

exports.sendMessage = sendMessage;

index.js
(실행 모듈)

var kafka = require('kafka-node');

var tmgetter = require('./tmgetter');
var sender = require('./sender');
// producer를 별도의 모듈로 만들고 오브젝트를 생성하여 사용하고자 했으나
// 이렇게 하면 무슨 이유인지 producer가 작동하지 않는다.
//  producer의 ready 이벤트가 어떤 조건에서 발생하는지 모르겠다...ㅠ.ㅠ
//var producer = require('./producer');


tmgetter.getTwitterMessage(kafka, 20, sender.sendMessage);

일단 이렇게 소스는 조금 정리를 했다.
Producer쪽 로그도 조금 깔끔하게 다듬었다.



모니터링 툴

Kafka Manager


비교적 다양한 정보를 볼 수 있는 장점이 있다. 물론 그만큼 설정이
복잡하다. 물론 기본적인 설정만으로도 웬만한 정보는 확인 가능하며
지난 포스팅에 언급한 것 처럼 JMX 기반으로 정보를 확인하고자 하면
kafka 서버 실행시 다음과 같이 JMX 포트를 명시해주어야 한다.


env JMX_PORT=9999 ./kafka-server-start.sh ../config/server.properties


설치 및 실행은 지난 포스팅에 링크한 github 페이지를 참고하도록 하고
오늘은 화면 설명만 간단하게 하겠다.


클러스터 설정 화면


일단 가장 기본적인 Cluster Name을 정해주고
Cluster Zookeeper Hosts설정을 해준다. 제목대로 Zookeeper 노드의 
주소와 포트를 넣어주면 된다. 다음 Kafka Version을 선택하고 마지막으로 
JMX 관련 설정을 하면 기본적인 설정은 끝이다. 나머지 설정들은 기본값으로…
설정이 끝났으면 화면 맨 아래의 SAVE 버튼을 누르자.


설정을 저장하고나서 상단 Cluster메뉴에서 List를 선택하면
아주 심플한 화면에 방금 설정을 마친 클러스터 이름이 보인다.

이 클러스터 이름을 클릭하면 다음과 같은 화면이 나온다.


여전히 화면은 심플하지만 상단에 뭔가 메뉴가 잔뜩 생겼다.
사실 상단 메뉴의 대부분은 뭔가를 생성하거나 변경하는 내용들이라
아직 직접 사용해 보지는 않았다. 이제 친숙한 용어들을 눌러보자


예상했듯이 topic 목록이 나온다. 나는 토픽을 1개만 생성을 했기 때문에
하나의 항목이 보이고 보다시피 topic 이름은 ‘twittopic’이다.
내용을 보면 알겠지만 3개의 partition이 있고 5개의 broker가 구동중이며
3개의 복제본이 있다. 다른 항목들은 그냥 보면 되겠는데 
Brokers Skew %라는 것이 뭔지 도통 모르겠다.
topic 이름을 클릭해보자


뭔가 조금 복잡한 화면이 나왔지만 살펴보연 목록에 나왔던 내용들의

반복에 특정 시간 간격에서의 전송량 등 쉽게 알 수 있는 내용들이
표시된다. 특히 Partitions by Broker를 보면 topic의 각 partition이
전체 broker에 어떻게 복제가 되어있는지 알 수 있다.
여기도 역시 Skewed라는 항목이 있는데 skewed의 사전적 의미에
‘왜곡된’, ‘편향된’ 이라는 의미가 있고 Skewed가 id 2번의 브로커만
true이면서 3개의 partition의 복제본이 모두 존재하는 상황으로
미루어보건데 partition의 복제본이 정도 이상으로 집중된 상황을
표시하는 것 같다. 만일 복제본을 5개로 설정했다면 전체 Broker가
모두 Skewed 값이 true가 되고 Broker Skewed %도 100%가
되지 않알까 싶다. 요건 추후 테스트해봐야겠다.


만일 JMX를 사용하지 않게 되면 Metrics 영역에 보이는 회색 타원
영역의 값들이 표시되지 않는다.


Consumers consuming from this topic는 Consumer 그룹을
보여준다. 나는 하나의 컨슈머 그룹만 설정을 했고 그 이름을
kafka-node-group으로 했기에 하나만 보여지고 있다. 한 번 눌러보자


현재 topic partition이 3개이고 consumer가 하나의 그룹에 3개의
인스턴스로 존재하기 때문에 각각의 partition과 consumer는 1:1로
연결이 된다.


다시 돌아가서 이번에는 Broker를 한 번 확인해보자. 현재 3개의 
복제본이 모두 저장되어있는 2번 Broker를 클릭해보겠다.


역시 조금 복잡해보이지만 앞의 정보들이 반복해서 보여진다.
처음으로 그럴듯한 챠트가 하나 나왔다. 시간 흐름에 따라 처리되는
메시지의 양을 보여주는 그래프다. Per Topic Detail영역을 보면
토픽에 대한 정보를 볼 수 있는데 앞의 메뉴에서 이미 확인된 내용들이다.


이렇게 뭔가 화면이 많고 복잡해 보이기는 하나 대부분 중복된
내용이 많다. 그리고 JMX를 이용하지 않을 경우 처리량에 해당하는 정보
(회색 타원 영역과 그래프 등)는 볼 수 없다.


Kafka Consumer Offset Monitor


이 툴은 이름에서도 알 수 있듯이 Cunsumer가 message를 소비하는데
초점이 맞춰져있다. 굉장히 심플하다. 별다른 설정도 필요 없다.
메인 메뉴는 다음과 같다.


Consumer Groups를 선택한 화면이다.
앞서 언급한 것처럼 나는 kafka-node-group이라는 하나의 그룹만을
가지고 있다.


kafka-node-group을 눌러보자



아주 깔끔하게 Broker 목록과 Consumer의 message 소비 상황이보인다.


전체 Broker는 5대의 node(Mac mini)에서 실행되고 있지만 
Consumer는 2, 3, 4번 Broker에서만 실행되고 있기 때문에 Broker
목록에 2, 3, 4번만 보인다.


Consumer Offsets 영역에는 partition 번호와 소비한 message
offset과 size 등의 정보가 보여진다. 그리고 topic 이름인 twittopic을
클릭하면 아래와 같은 그래프가 표시된다.



zoom에서 선택한 시간 동안의 message 소비의 변화량을 보여준다.


메인 메뉴의 2번째 메뉴인 Topic List 메뉴도 결국에는 이 화면으로 
들어오게 된다.


마지막의 Visualizations 메뉴는 클러스터의 구조를 트리 형태로
보여주는 메뉴이며 다음 2개의 화면을 제공한다.


이상과 같이 이번 포스팅에서는 간단하게 2개의 kafka 모니터링 툴에 대해
알아보았다. 아직 잘 모르는 항목들과 기능 버튼들이 있는데 이런 부분들은
추후 확인되는대로 내용을 보완해나가도록 하겠다.




Kafka 무작정 실행하기 - 2


58의 비밀

먼저 지난 번 마지막에 언급했던 58이란 숫자의 비밀을 밝혀보자.
사실 정확한 원인은 아직 확인 못했다. 다만 지난 번 코드의 구현이 ’트위터
메시지가 하나 들어올 때마다 producer 하나를 만들어 트위터 메시지를
topic에 보낸다’
는 것이었다.


이 과정에서 의심할 수 있는 것은 매번 producer를 만들어 커넥션울 하게 되니
아마도 이 커넥션 수에 제한이 걸려버린 것이 아닐까 하는 부분이었다.


그래서 일단 직감에 의존해 producer에서 topic으로 메시지를 보낸 후
API의 close 함수로 연결을 끊어보았다. 예상이 적중하였는지 이후로는
58개의 제한에 걸리지 않고 트위터에서 받아오는 모든 메시지들이
정상적으로 전송이 되었다.



성능 관리

겨우 이만큼의 구현과 설정을 해놓고 성능 관리를 논하는 것은 우스운 일이지만
일단 트위터 메시지 한 건에 커넥션 하나를 만들었다가 끊는다는 것은 아무리
생각해도 그리 바람직하지는 않다고 판단하였다. 


그리서 우선 아쉬운대로 트위터 메시지 10건이 수신되면 그 10개를 하나의
배열로 하여 producer를 통해 보내도록 하였다. 우선은 시험 삼아
이정도 처리를 한 것이고 시스템 환경에 따라 아마도 이러한 수치를 세밀하게

조절을 해야 할 것이다.


그리고 실제로 topic으로 메시지를 보내는 위치에서 파티션(나는 3개의 
partition 나뉜 1개의 topic을 만들었다)에 고르게 분산시키기 위해
파티션의 인덱스를 계산하는 식을 하나 추가하였다.


  • 처음에는 손이 덜가는 HighLevelProducer를 사용했었으나 뭔가 잘
    작동하지 않는 듯하여 그냥 Producer로 바꾸었다.

수정된 소스

일단 지난 시간의 문제점을 수정하고 몇가지 필요한 내용을 추가한
소스를 다음과 같이 만들었다.

var kafka = require('kafka-node');


var Twitter = require('twitter');
var count = 0;


var client = new Twitter({
  consumer_key: ‘…’,
  consumer_secret: ‘…’,
  access_token_key: ‘…’,
  access_token_secret: ‘…’,
});

var msgArr;

/**
 * Stream statuses filtered by keyword
 * number of tweets per second depends on topic popularity
 **/
client.stream('statuses/sample', {track: ''},  function(stream){
  stream.on('data', function(tweet) {
    if (tweet.lang == 'kr' || tweet.lang == 'ko') {
      if (count % 10 == 0) {
        msgArr = new Array();
      }

      var messageObj = {};

      messageObj.createdAt = tweet.created_at;
      messageObj.id = tweet.id;
      messageObj.idStr = tweet.id_str;
      messageObj.text = tweet.text;
      messageObj.UserScreenName = tweet.user.screen_name;

      messageObj.retweetCreatedAt = '';
      messageObj.retweetId = '';
      messageObj.retweetIdStr = '';
      messageObj.retweetText = '';
      messageObj.retweetUserScreenName = '';
      messageObj.retweetCount = '';

      if (tweet.retweeted_status) {
        messageObj.retweetCreatedAt = tweet.retweeted_status.created_at;
        messageObj.retweetId = tweet.retweeted_status.id;
        messageObj.retweetIdStr = tweet.retweeted_status.id_str;
        messageObj.retweetText = tweet.retweeted_status.text;
        messageObj.retweetUserScreenName = tweet.retweeted_status.user.screen_name;
        messageObj.retweetCount = tweet.retweeted_status.retweet_count;
      }

      var messageStr = JSON.stringify(messageObj)
      console.log(messageStr);
      console.log('===================================================================' + count);

      msgArr.push(messageStr);

      if (count % 10 == 9) {
        console.log('==call sendMessage====================================================');
        var Producer = kafka.Producer;
        var Client = kafka.Client;
        var client = new Client('NAMENODE.local:2181,SECONDARY-NAMENODE.local:2181,DATANODE1.local:2181,DATANODE2.local:2181,DATANODE3.local:2181');

        var topic = 'twittopic';
        var producer = new Producer(client, { requireAcks: 1 });

        sendMessage(producer, msgArr, count);
      }
      count++;
    }
  });

  stream.on('error', function(error) {
    console.log(error);
  });
});

var sendMessage = function(producer, msgArr, count) {

  console.log('==sendMessage=================================================================');

  producer.on('ready', function () {
    console.log('==producer ready=========');
    console.log('send start');

    var p = (count - 9) % 3;
    producer.send([
      {topic: 'twittopic', partition: p, messages: msgArr, attributes: 0 }
    ], function (err, data) {
      if (err) console.log(err);
      else console.log('send %d messages', count);

      producer.close(function() {
        console.log('close produce');
      });
    });
  });


  producer.on('error', function (err) {
    console.log('error', err)
  });
};

모니터링

일단 내용 구현을 하고 실제 작동하는 것 까지 확인을 하였으니 이제제대로
뭔가를 하고 있는 것인지 궁금해졌다. 일단 Kafka 자체적으로는 달리 웹으로 
구현된 관리 화면을 제공하지 않기에 잘 알려진 모니터링 도구 2가지를 
설치해보았다.


Kafka Offset Monitor : http://quantifind.github.io/KafkaOffsetMonitor/

Kafka Manager : https://github.com/yahoo/kafka-manager


이중 Kafka Manager는 Kafka에서 제공하는 JMX 인터페이스를 이용하는
툴로 이 툴을 이용하기위해서는 Kafka 서버 시작 시 JMX 포트를 함께 지정해
주어야 한다.

env JMX_PORT=9999 ./kafka-server-start.sh ../config/server.properties


다음은 각 모니터링 툴의 실제 화면이다.

  • Kafka Offset Monitor


  • Kafka Manager

그리고 다음은 node.js를 통해 실행한 Producer와 Consumer의 콘솔 화면이다.

아직 Consumer에서는 별다른 메시지 가공은 없이 topic에서 가져온 메시지를
로그로 출력만 하고 있다.


ProducerConsumer1 (DATANODE1)

Consumer2(DATANODE2)Consumer3 (DATANODE3)

현재 구성이 topic이 3개의 partition으로 되어있고 3개의 Consumer가 하나의
Consumer 그룹(kaka-node-group : kafka-node에서 자동 생성한 그룹)에
3개의 Consumer가 있기 때문에 각각의 partition과 Consumer는 다음과 같이
1:1로 연결이 되고 있다.


Consumer1 - partition 0
Consumer2 - partition 2
Consumer3 - partition 1


다음 포스팅에서는 현재 내가 구성하고 있는 시스템의 구조를 설명하고
Consumer 중단 시에 rebalancing이 일어나는 모습을 코솔 로그를 통해
확인해보겠다.



Kafka 무작정 실행하기


참고한 자료들

지난 포스팅까지 Kafka의 개요를 알아보았다.
물론 수박 겉핥기 수준의 내용이었지만 더이상의 자세한 내용은
생략할 수밖에 없을 것 같다.


이미 많은 블로그에 보다 자세하고 정확한 내용들이 올라와있어
또다시 이 작업을 하는 것은 소모적인 일이 될 것 같아서이다.
게다가 내 짧은 지식으로는 잘못된 정보를 전달할 가능성도 높고…ㅠ.ㅠ


간단하게 참고한 블로그를 소개하자면 다음과 같다.


http://epicdevs.com/17
http://blog.embian.com/category/Apache%20Kafka
http://blog.jdm.kr/208
http://wiki.intellicode.co.kr/doku.php?id=개발:kafka


무작정 달려들기

일단 내가 kafka에 관심을 가지게 된 것은 twitter의 streamin API를
통해 가져오는 데이터를 1차 가공 후 HADOOP이나 HBase에 적재
하기 위해서였다.


어찌어찌 node.js 모듈을 구해서 twitter 샘플 데이터를 가져오는 것은
만들어놓았는데 문제는 적재 전에 가공을 하는 부분이었다.
대략 초당 2~3건 이상 들어오는 데이터를 바로바로 적재하는 것도
쉽지 않을 것 같은데 1차 가공을 해서 적재를 하려고 하니 난감했던
것이다. 그러던 차에 발견한 것이 kafka였고 얼핏 보기에 내 목적을 
충족 시켜줄 수 있을 것으로 판단되었다.


간단하게 개념을 익힌 후 기왕에 twitter 처리가 node.js 모듈로 구현
되었기에 kafka역시 node.js 모듈로 붙여보기로 했다. 
kafka 홈페이지의 클라이언트 메뉴에 있는 node.js 항목에서
kafka-node녀석을 사용하기로 했다. 이유는? 문서가 가장 잘 만들어져
있어서였다. 


(kafka-node : https://github.com/SOHU-Co/kafka-node/)


샘플 코드를 보고 구현을 해보았으나 node.js에 대한 지식도 부족한터라
생각처럼 잘 진행이 되지 않았다.


Node.js에서 막히다…

내가 구상했던 것은 당연히 다음의 진행이다.


  1. twitter 데이터 가져오기 
  2. Producer에서 메시지(1번의 twitter 데이터) 전달 
  3. topic에 적재
  4. Consumer에서 메시지 가공

그런데…
Node.js의 비동기 처리가 날 당황스럽게 했다.


우선은 기존의 twitter streaming API 처리를 하던 js에 kafka-node의
producer 샘플을 그대로 추가했다. producer는 비교적 설정이
단순한 HighLebelProducer를 사용하였다.
최초의 코드는 다음과 같이 구현되었다.


var kafka = require('kafka-node');
var Twitter = require('twitter');
var count = 0;


var client = new Twitter({
  consumer_key: '...',
  consumer_secret: '...',
  access_token_key: '...',
  access_token_secret: '...',
});

/**
*  Stream statuses filtered by keyword
*  number of tweets per second depends on topic popularity
**/
client.stream('statuses/sample', {track: ''},  function(stream){
  stream.on('data', function(tweet) {
      if (tweet.lang == 'kr' || tweet.lang == 'ko') {
        var HighLevelProducer = kafka.HighLevelProducer;
        var Client = kafka.Client;
        var client = new Client('NAMENODE.local:2181,\
            SECONDARY-NAMENODE.local:2181,\
            DATANODE1.local:2181,\
            DATANODE2.local:2181,\
            DATANODE3.local:2181');
        var topic = 'twittopic';
        var producer = new HighLevelProducer(client);
        console.log(tweet);
        console.log('===================================' + count);

        sendMessage(producer, tweet, count);

        count++;
      }
    });

    stream.on('error', function(error) {
    console.log(error);
  });
});

var sendMessage = function(producer, message, count) {
  console.log('==sendMessage=========================');

  producer.on('ready', function () {
    console.log('==producer ready=========');
    setInterval(send, 500);
  });

  function send() {
    console.log('send start');
    producer.send([
        {topic: 'twittopic', messages: [message.id] }
      ], function (err, data) {
      if (err) console.log(err);
      else console.log('send %d messages %s', count, message.id);
    });
  }

  producer.on('error', function (err) {
    console.log('error', err)
  });
};


이렇게 구현을 하는 동안 다음의 문제에 막혔다.


  • producer 객체를 언제 생성해야 하는가?

우선은 당연히 var kafka = require('kafka-node'); 이후에 이어서
관련 객체들을 생성해야 한다고 생각해서 그렇게 진행을 했더니
producer가 전혀 작동을 하지 않았다. 왜인지 아직도 원인은 모르겠다.


이후 twitter 데이터를 가져오는 부분이 
‘stream.on('data', function(tweet)’이고, ’data’이벤트가 발생하면 
콜백함수가 실행되고 이 콜백함수의 파라미터로 twitter의 데이터가 
전달된다. 따라서 나는 데이터가 전달되는 이 위치에서 producer 처리를 
해야 한다고 판단했다. 그래서 현재와 같이 구현을 하였고 일단 동작은
하였다.


  • setInterval의 있고 없음의 차이

보면 알겠지만 sendMessage 함수 내에 구현된 producer역시
‘ready’ 이벤트 발생시 콜백함수가 실행되는 구조이다. 그리고
이 안에 실제 메시지를 보내는 함수를 호출하는 코드가 구현되어있다.


그런데 이 구현에서 setInterval을 사용을 하게 되면 twitter의 콜백이
한 번 호출할 때마다 0.5초 동안 send 함수가 수십차례 호출이 되는
전형적인 비동기 호출의 문제에 빠졌다.


앞서 말했듯이 나는 twitter 메시지 한개가 들어오면 producer에서
그 메시지 1번만 보내는 것을 원했다.


다행히 아주 직관적으로 setIntervar을 제거하니 얼추 동기화가 되었다.


  • 58 제약?

일단 setInterval을 제거하고 호출을 하니 동기화는 되었는데
또다른 문제가 발생을 하였다. 58건의 메시지를 보내고 나면 더이상
메시지의 send가 이루어지지 않는 것이었다. 게다가 consumer도
하나만 메시지를 수신하고 partition 0번만 처리를 하고 있는 것이다.


혹시나 해서 setInterval을 다시 넣어보니 이 경우에도 0.5초동안
전송되는 횟수는 58회였다.


뭔지는 모르겠지만 이정도 수치에서 뭔가 제약이 걸리는 것 같은데
이유를 모르겠다(다음 포스팅에 해결책만 적겠다).


어쨌든 여기까지로 twitter로 메시지를 받고 producer로 topic에
쌓고 consumer로 메시지를 소비하는 것까지의 기본 동작은 확인을
했다. 


다음 포스팅에서는 이 소스를 약간 변형시켜서 애초에 구상했던
내용에 근접하는 동작을 구현한 코드를 가지고 이야기를 풀어나가자. 



Consumer


일반적으로 메시징 시스템은 queuing 기반 모델과 publish-subscribe 
기반의 모델로 나누어 볼 수 있다.


queuing기반의 메시징 시스템은 sender가 queue에 쌓아 놓은 메시지를 
pool에 있는 receiver중 하나에 각각 할당하는 방식이며 이 과정은 
비동기적으로 이루어진다. (point-to-point)


publish-subscribe기반의 메시징 시스템은 publisher가 메시지를 생성한 후
subscriber에게 broadcasting해준다.


Kafka는 consumer group이라는 개념을 만들어 이 두가지 방식을 종합하고 있다.


토픽으로 발행된 메시지들은 분산된 프로세스나 장비에 있는 consumer 그룹 내의 consumer 중 

오직 하나에게만 전달된다.



이 때, consumer들이 1개의 그룹으로만 묶여있으면 전통적인 queuing 방식으로 작동하고 

( consumer group = consumer pool) 모든 consumer가 서로 다른 그룹에 있으면 

publish-subscribe방식으로 작동하여 메시지들이 전체 consumer에게 broadcasting된다.


아래 그림에서 Consumer Group A가 없다고 생각해보자 Server 1과 Server 2에 있는 각각의 

topic partition들은 Group B 내의 consumer instance들과 1:1로 대응된다. 즉 point-to-point 방식의 

queuing이 되는 것이다.



하지만 그림과 같이 2개의 consumer 그룹이 있으면 각각의 partition들은 각 그룹으로 broadcasting을 

하게 되는 것이다.



이미지 출처1


간단하게 말하자면 이 구조는 구독자가 consumer의 클러스터로 되어있는 경우에는 결국 publish-subscribe 모델과 

다를바가 없다고 볼 수 있다.


Kafka는 전통적인 메시지 시스템보다 강력한 순차적 처리를 보장한다.


전통적인 queue 역시 메시지를 서버에 순차적으로 저장하고 다수의 consumer가 queue로부터 메시지를 사용할 때
저장된 순서에 따라 consumer들에게 메시지를 전달하지만 메시지 전달이 비동기적으로 일어나기 때문에 서로 다른 
consumer에 순서에 맞지 않는 메시지가 도착할 수도 있다.


이러한 동작은 사실상 병렬로 메시지가 사용 됨으로 인해 메시지 정렬의 의미가 퇴색하는 것이다.


종종 이런 상황을 회피하기 위해 ’배타적 consumer’라는 개념을  두어 queue에서 메시지를 읽을 때 단일 프로세스만 

사용하도록 하는 경우도 있는데 이는 당연히 병렬 처리를 못하게 되는 결과를 가져온다.


Kafka는 순차적인 처리를 더 잘 해낼 수 있다. topic 내에 partition이라는 병렬 처리의 개념을 둔 것이다.
이로인해 Kafka는 순처적인 처리에 대한 보장과 pool로 구성된 consumer들의 처리에 대한 load balancing을 

모두 제공할 수 있다.


이전 내용의 그림을 다시 보자


이미지 출처2


이러한 이점은 topic 내의 partision들을 consumer 그룹에 할당을 하고 각각의 partition들은 그룹 내에서 반드시 

하나의 consumer에 의해서만 사용되도록 함으로써 얻을 수 있다. 


이를 통해 어떤 consumer가 유일하게 특정 partition으로부터만 메시지를 소비하고 있으며 이로 인해 partition에 

정렬된 순서에 따라 메시지를 소비하고 있다는 것을 확신할 수 있는 것이다. 게다가 이런 과정이 진행되는 동안 

부하의 균형이 유지될 수 있다. 다만 consumer 그룹 내에 partition보다 많은 consumer들이 있게 되면 노는 

consumer들이 발생하니 주의가 필요하다.


Kafka는 동일 partition 내에서만 순서를 보장한다. 위의 그림에서 보자면 C3의 경우 P0와 1:1로 연결되어있기 때문에
순차적인 처리를 확실하게 보장 받을 수 있지만 C1의 경우 P0와 P3의 메시지를 함께 받고 있으며 이 두 개의 partition을
포괄하는 순차 처리는 보장 받을 수 없게 되는 것이다.



만일 전체적인 범위에서의 메시지 정렬이 필요하다면 topic이 단지 하나의 partition만을 갖도록 하여 처리할 수 있디.
하지만 이럴 경우 consumer 그룹 당 하나의 consumer 프로세스만 있게 되는 것이다.


Guarantee

높은 수준의 Kafka 구현은 다음과 같은 내용을 보장한다.


  • 메시지는 보내진 순서대로 추가된다. 즉 동일한 producer에서 보내진 message M1과 
    message M2가 있고 M1이 먼저 보내졌다면 M1이 M2 보다 낮은 offset을 갖고 로그에도 먼저 나온다.
  • consumer 인스턴스는 로그상에 정렬된 순서대로 메시지를
    인지한다.
  • N개의 복제 팩터를 가진 topic에 대해 메시지 손실 없이 N-1대의
    서버 장애를 허용한다.
  1. 이미지 출처 : Kafka 공식 홈페이지 (http://kafka.apache.org/documentation.html#introduction↩︎
  2. 이미지 출처 : Kafka 공식 홈페이지(http://kafka.apache.org/documentation.html#introduction↩︎


Distribution

topic의 partition들은 Kafka 클러스터를 구성하는 서버들에 분산 저장이 된다.
partition들은 내고장성을 위해 여러 서버에 복제되며 복제되는 서버의 수를 설정
할 수 있다.


각 partition은 1대의 leader 서버와 0대 이상의 follower 서버들로 구성된다.

(즉, leader 서버 1대로도 Kafka 사용이 가능하다.)

leader는 읽기 쓰기가 모두 가능하고 follower들은 leader의 데이터를 복제한다.


만일 leader가 고장나면 follower 중 한 대를 leader로 선출한다.
이런 구조로 클러스터 내에서의 부하가 적절히 분산된다.


지난 포스팅 (Apache kafka 시작하기)에서의 기억을 더듬어보면
일단 클러스터 내의 복제 서버는 총 5대로 설정되어있다. zookeeper의
server.0 ~ server.4 설정과 Kafka server.properties에서의
zookeeper.connect 설정에 맥미니 5대의 host name이 설정된 것을
확인 할 수 있다.


그리고 이 상태에서 다음과 같이 토픽을 생성할 때 처음에 오류가 발생을 했었다.

SECONDARY-NAMENODE:bin mazdah$ ./kafka-topics.sh --create --zookeeper SECONDARY-NAMENODE.local:2181 --replication-factor 5 --partition 20 --topic test 


원인은 SECONDARY-NAMENODE.local은 leader가 아니었고 그렇기 때문에
복제만 가능하고 write가 불가능했던 것이다. 
결국 leader인 NAMENODE.local을 이용하여 topic 생성에 성공했다.


그리고 이렇게 생성된 topic은 5대의 서버의 로그 디렉토리 
(server.propertiesd에 log.dirs로 설정된 위치 - 나의 경우 /tmp/kafka-logs)에
다음과 같이 뭔가 생성된 것을 확인할 수 있었다.



Producer

Producer는 선택한 topic으로 데이터를 발행한다.
Producer는 메시지를 발행할 topic을 선택하는 책임을 지는데
전통적인 round-robin 방식을 이용하여 균등하게 발행을 하거나
아니면 메시지의 어떤 키를 이용하여 특정 partition에는 특정 키를 가진
메시지만을 보낼 수 있다.



Apache Kafka 시작하기


참조 사이트

http://kafka.apache.org
http://epicdevs.com/20

다행이 예전에 Hbase를 설치할 때 zookeeper를 설치해놓은 덕에 kafka를 설치하고 구동하는 과정은 

그리 어렵지 않았다.아다시피 요즘 OS가 Windows만 아니면 binary 패키지를 다운로드 받고 적절한 위치에 

압축 풀고 하면반은 된 것이나 다름 없다.


kafka의 경우 $KAFKA_HOME/config/server.properties에 몇가지 설정만 한 후 기동시키면 된다.
다음은 내 PC 환경에서의 설정 및 구동 과정이다.


PC 환경

현재 맥미니 5대를 내부망으로 연결시킨 상태로 각각의 PC에 대한 정보는
/etc/hosts 파일에 설정이 되어있다. 대략 다음과 같다.

172.30.1.33     NAMENODE.local
172.30.1.56     SECONDARY-NAMENODE.local
172.30.1.59     DATANODE1.local
172.30.1.39     DATANODE2.local
172.30.1.11     DATANODE3.local

때문에 대부분의 설정에서는 host name인 NAMENODE.local 등을 이용한다. 이후
설정 사항에 이점 참고하길 바란다.


zookeeper 설정

zookeeper는 5대의 노드 모두 설치가 되어있으며 대략 다음과 같이 
설정이 되어있다. 설정 파일은 $ZOOKEEPER_HOME/cont/zoo.cfg이다. 
물론 5대의 노드 모두 동일한 설정이다.

dataDir=/zookeeper
clientPort=2181

#Zookeeper Servers
server.0=NAMENODE.local:2888:3888
server.1=SECONDARY-NAMENODE.local:2888:3888
server.2=DATANODE1.local:2888:3888
server.3=DATANODE2.local:2888:3888
server.4=DATANODE3.local:2888:3888

zookeeper의 실행 명령은 다음과 같다. 
(zookeeper 설치 위치는 zookeeper-3.4.5다

NAMENODE1:bin mazdah$ /zookeeper-3.4.5/bin/zkServer.sh start

kafka 설치, 설정, 구동

  • 설정

앞서 말했듯이 다운로드 압축해제 적절한 위치 복사면 모든 설치는 끝이다. 내가 받은 버전은 

kafka_2.11-0.9.0.1이며 /kafka에 설치했다.


우선 설정파일을 열고 다음과 같이 설정을 하였다. 설정 파일은 /kafka/config/server.properties다.
가장 먼저 설정해주어야 할 부분은 broker.id이다. 

나는 총 5대의 노드가 있기 때문에 각각의 PC(node)에 0부터 4까지 아이디를 부여해주었다.
예를들어 NAMENODE.local의 설정은 다음과 같다.

broker.id=0

다음으로는 zookeeper에 대한 설정으로 zookeeper.connect라는 항목에 
앞서말한 5개의 노드를 모두 적어주었다.

zookeeper.connect=NAMENODE.local:2181,SECONDARY-NAMENODE.local:2181,DATANODE1.local:2181,DATANODE2.local:2181,DATANODE3.local:2181

나는 처음에 이 부분에 적는 2181 포트가 도대체 어떤 포트인가 고민하다가 나중에 zookeeper의 cleintPort 

설정을 보고는 이해했다…-.- 그밖의 나머지 설정은 그냥 default로 두었다.


나중에 추가한 설정이 있는데 이 것은 나중에 상황을 설명하면서 함께 이야기 하겠다.

  • kafka 서버 기동

이제 kafka 서버를 기동해보자. 각각의 노드에서 /kafka/bin으로 이동한 후 다음과 같이 입력하여 기동한다.

NAMENODE:bin mazdah$ ./kafka-server-start.sh ../config/server.properties

아래와 같이 kafka 설정 정보들이 죽 나오고 이후 서버 start 메시지가 출력되면서 기동된다.

[2016-03-20 11:38:57,142] INFO KafkaConfig values: 
advertised.host.name = null
metric.reporters = []
quota.producer.default = 9223372036854775807
offsets.topic.num.partitions = 50
.
.
.

이후 테스트는 http://epicdevs.com/20의 내용을 그대로 진행해보았다.
물론 서버 정보 등은 나의 환경에 맞춰서..

  • topic 생성

SECONDARY-NAMENODE:bin mazdah$ ./kafka-topics.sh --create --zookeeper SECONDARY-NAMENODE.local:2181 --replication-factor 5 --partition 20 --topic test

그런데 처음부터 문제가 생겼다. 위와같이 시작을 했는데 아래와 같은 오류가 발생을 했다.

Exception in thread "main" org.I0Itec.zkclient.exception.ZkTimeoutException: Unable to connect to zookeeper server within timeout: 30000
at org.I0Itec.zkclient.ZkClient.connect(ZkClient.java:1223)
at org.I0Itec.zkclient.ZkClient.<init>(ZkClient.java:155)
at org.I0Itec.zkclient.ZkClient.<init>(ZkClient.java:129)
at kafka.utils.ZkUtils$.createZkClientAndConnection(ZkUtils.scala:89)
at kafka.utils.ZkUtils$.apply(ZkUtils.scala:71)
at kafka.admin.TopicCommand$.main(TopicCommand.scala:53)
at kafka.admin.TopicCommand.main(TopicCommand.scala)

ps 명령어로 각 노드에서 확인을 해보아도 모든 zookeeper 인스턴스들이 정상적으로 떠있는 상태였는데 

zookeeper 연결시 타임아웃으로 연결할 수 없다니…ㅠ.ㅠ


일단 퍼뜩 떠오르는 생각은 zookeeper의 leader가 NAMENODE.local에 설정되어 있다는 것이다. 

아마도 topic 생성 등의 중요 작업은 zookeeper leader 노드에서 해야 하는 것 같다. 

다음과 같이 NAMENODE.local에서 topic이 생성되도록 하니 정상적으로 topic이 생성되었다.
(실제로 zookeeper의 leader node에서 실행하지 않아도 —zookeeper 옵션만leader node로 해주어도 된다). 
다음과 같이 성공하였다.

SECONDARY-NAMENODE:bin mazdah$ ./kafka-topics.sh --create --zookeeper NAMENODE.local:2181 --replication-factor 5 --partition 20 --topic test

Created topic "test".
  • topic 리스트 보기

자 그러면 이제 topic 리스트를 한 번 보도록 하자.

SECONDARY-NAMENODE:bin mazdah$ ./kafka-topics.sh --list --zookeeper NAMENODE.local:2181

test

아직은 만들어진 topic이 test 하나밖에 없기 때문에 test만 출력된다.

  • topic 상세 보기

그리고 다음과 같이 test topic의 상세 내용을 확인해보자.

SECONDARY-NAMENODE:bin mazdah$ ./kafka-topics.sh --describe --zookeeper NAMENODE.local:2181 --topic test

Topic:test	PartitionCount:20	ReplicationFactor:5	Configs:
Topic: test	Partition: 0	Leader: 0	Replicas: 0,1,2,3,4	Isr: 0,1,2,3,4
Topic: test	Partition: 1	Leader: 1	Replicas: 1,2,3,4,0	Isr: 1,2,3,4,0
Topic: test	Partition: 2	Leader: 2	Replicas: 2,3,4,0,1	Isr: 2,3,4,0,1
Topic: test	Partition: 3	Leader: 3	Replicas: 3,4,0,1,2	Isr: 3,4,0,1,2
Topic: test	Partition: 4	Leader: 4	Replicas: 4,0,1,2,3	Isr: 4,0,1,2,3
Topic: test	Partition: 5	Leader: 0	Replicas: 0,2,3,4,1	Isr: 0,2,3,4,1
Topic: test	Partition: 6	Leader: 1	Replicas: 1,3,4,0,2	Isr: 1,3,4,0,2
Topic: test	Partition: 7	Leader: 2	Replicas: 2,4,0,1,3	Isr: 2,4,0,1,3
Topic: test	Partition: 8	Leader: 3	Replicas: 3,0,1,2,4	Isr: 3,0,1,2,4
Topic: test	Partition: 9	Leader: 4	Replicas: 4,1,2,3,0	Isr: 4,1,2,3,0
Topic: test	Partition: 10	Leader: 0	Replicas: 0,3,4,1,2	Isr: 0,3,4,1,2
Topic: test	Partition: 11	Leader: 1	Replicas: 1,4,0,2,3	Isr: 1,4,0,2,3
Topic: test	Partition: 12	Leader: 2	Replicas: 2,0,1,3,4	Isr: 2,0,1,3,4
Topic: test	Partition: 13	Leader: 3	Replicas: 3,1,2,4,0	Isr: 3,1,2,4,0
Topic: test	Partition: 14	Leader: 4	Replicas: 4,2,3,0,1	Isr: 4,2,3,0,1
Topic: test	Partition: 15	Leader: 0	Replicas: 0,4,1,2,3	Isr: 0,4,1,2,3
Topic: test	Partition: 16	Leader: 1	Replicas: 1,0,2,3,4	Isr: 1,0,2,3,4
Topic: test	Partition: 17	Leader: 2	Replicas: 2,1,3,4,0	Isr: 2,1,3,4,0
Topic: test	Partition: 18	Leader: 3	Replicas: 3,2,4,0,1	Isr: 3,2,4,0,1
Topic: test	Partition: 19	Leader: 4	Replicas: 4,3,0,1,2	Isr: 4,3,0,1,2

topic 생성시 partition을 20으로 지정했기 때문에 Partition: 0부터  Partition: 19까지 20개의 목록이 보이게 된다. 

각 항목에 대해서는 추후에 설명하기로 하고 대략 이런 모습이란 것만 확인하고 넘어가자.

  • producer를 이용한 메시지 생성

다음과 같이 입력하여 message룰 생성해보자.
이 명령에서 보여지는 9092포트는 kafka 설정 파일인 server.properties에 “listeners=PLAINTEXT://:9092”로 

설정이 되어있다. 실행하면 빈 프롬프트가 생기고 적절한 메시지를 입력하면 된다. 


입력한 후 엔터를 치면 다음 메시지를 입력할 수 있다. message1에서 message6까지 메시지를 생성해보았다.

SECONDARY-NAMENODE:bin mazdah$ ./kafka-console-producer.sh --broker-list NAMENODE.local:9092 --topic test

message1
message2
message3
message4
message5
message6
  • cunsumer를 이용한 message 확인

다음과 같이 consumer를 실행하여 producer에서 생성한 메시지를 확인한다. 입력한 순서대로 출력되지 않는 것이 

아마도 Map 형식의 처리인 것으로 보인다. (가운데 빈 라인은 message 입력 제일 마지막에 엔터를 한 번 입력해서 

그렇다)

SECONDARY-NAMENODE:bin mazdah$ ./kafka-console-consumer.sh --zookeeper NAMENODE.local:2181 --topic test -rom-beginning

message5
message3

message1
message2
message4
message6
  • topic의 삭제

그냥 테스트로 만든 topic이니 미련 없이 삭제해보자. test이후에 test2라는 topic을 하나 더 만든 상태이다.
topic의 삭제는 생성과 동일한 구문으로 수행을 하며 —create 대신 —delete를 사용하면 된다.

SECONDARY-NAMENODE:bin mazdah$ ./kafka-topics.sh --delete --zookeeper NAMENODE.local:2181 --replication-factor 5 --partition 20 --topic test2

Topic test2 is marked for deletion.
Note: This will have no impact if delete.topic.enable is not set to true.

그런데 여기서도 이상한 부분이 발견되었다. 마지막 줄에 보면 다음과 같은 내용이 있다.
Note: This will have no impact if delete.topic.enable is not set to true.

혹시나 해서 topic 리스트를 확인해보았다.

SECONDARY-NAMENODE:bin mazdah$ ./kafka-topics.sh --list --zookeeper NAMENODE.local:2181

test
test2 - marked for deletion

역시나 삭제한 test2 토픽에 marked for deletion라는 내용이 붙어있다. 어찌된 일일까? 

짐작했겠지만 답은 삭제시 보여진 NOTE의 내용에 있다. 설정 파일에 delete.topic.enable항목을 추가하고 

값을 true로 설정하면 된다.

delete.topic.enable=true

이후 다시 목록을 확인해보면 다음과 같이 말끔히 지워진 것을 확인할 수 있다.

SECONDARY-NAMENODE:bin mazdah$ ./kafka-topics.sh --list --zookeeper NAMENODE.local:2181

test

일단 오늘은 여기까지 기본적인 내용을 확인할 수 있었다.
다음 주에는 본격적으로 kafka에 대해 하나 하나 정리를 해보아야겠다.


이제야 발견한 Kafka


하던 일도 제대로 못하면서 빅데이터 공부해보겠다고 꼴깝을 떤 것이 
벌써 2013년 5월달 이야기네…


뭔가 새로운 것에 대해서는 남들 앞에서 한 마디나마 
거들 수 있어야 하지 않을까 하는 밑도 끝도 없는 초조감이
나를 뻘짓거리의 함정으로 이끌었다.


뭔가를 시작하기 전에는 지름신을 영접하는 것이 당연한(?) 의례인지라
이 때도 지름신을 조금 과하게(…ㅠ.ㅠ) 영접했다.


맥미니 5대…


그리고는 한 동안은 신났다.
Hadoop 설치하고 zookeeper 설치하고 Hbase 설치하고…
그리고…샘플 한 번 돌려보고? 끝이었나?…ㅠ.ㅠ


목표로 삼았던 것이 twitter의 데이터를 수집해서 이것 저것 분석하는
공부를 좀 해보고자 했는데…
이게 당최 감이 안잡히는 것이다.


twitter API를 통해 데이터를 가져오는 것이야 알겠는데…
이걸 어떻게 저장을 해야하는지…


처음에는 그냥 REST API를 이용해서 데이터를 가져오고
무작정 일반 txt 파일로 저장을 했다.
그러다가 Streaming API를 이용하게 되었는데 아무리 샘플 데이터라
하더라도 연속적으로 들어오는 데이터를 어떻게 저장을 해야 할지,
그리고 간단하게나마 가공을 하고자 하는데 가공을 하는 중 계속해서
들어오는 데이터는 어떻게 처리를 해야 하는지에 대한 해결책을 
전혀 몰랐다.



그렇게 3년이 지나버린 것이다.
그리고 우연히 kafka를 발견한 것이다.
프란츠 카프카도 아니고 해변의 카프카도 아니고 apache Kafka!


일단 2가지 측면에서 관심을 가지게 되었는데 하나는 Use Case 중 
Stream Processing이고 다른 하나는 Client에 node.js가 있다는 점이었다.


처음부터 twitter API를 이용하는데 node module을 사용한지라 계속해서
node module을 사용할 수있다는 것은 나에게는 큰 매리트였다.



아직은 대충 훑어본 정도라 과연 나의 목적에 딱 맞는지는 잘 모르겠으나
이 부분은 계속 적용을 해 나가면서 구체적으로 정리를 해야겠다.


당분간 주말에 할 것이 생겨서 즐겁네

+ Recent posts

티스토리 툴바