본문 바로가기
  • SDXL 1.0 + 한복 LoRA
  • SDXL 1.0 + 한복 LoRA
Study/빅데이터

[간보기 | Kafka] Kafka 무작정 실행하기

by 마즈다 2016. 4. 18.
반응형



Kafka 무작정 실행하기


참고한 자료들

지난 포스팅까지 Kafka의 개요를 알아보았다.
물론 수박 겉핥기 수준의 내용이었지만 더이상의 자세한 내용은
생략할 수밖에 없을 것 같다.


이미 많은 블로그에 보다 자세하고 정확한 내용들이 올라와있어
또다시 이 작업을 하는 것은 소모적인 일이 될 것 같아서이다.
게다가 내 짧은 지식으로는 잘못된 정보를 전달할 가능성도 높고…ㅠ.ㅠ


간단하게 참고한 블로그를 소개하자면 다음과 같다.


http://epicdevs.com/17
http://blog.embian.com/category/Apache%20Kafka
http://blog.jdm.kr/208
http://wiki.intellicode.co.kr/doku.php?id=개발:kafka


무작정 달려들기

일단 내가 kafka에 관심을 가지게 된 것은 twitter의 streamin API를
통해 가져오는 데이터를 1차 가공 후 HADOOP이나 HBase에 적재
하기 위해서였다.


어찌어찌 node.js 모듈을 구해서 twitter 샘플 데이터를 가져오는 것은
만들어놓았는데 문제는 적재 전에 가공을 하는 부분이었다.
대략 초당 2~3건 이상 들어오는 데이터를 바로바로 적재하는 것도
쉽지 않을 것 같은데 1차 가공을 해서 적재를 하려고 하니 난감했던
것이다. 그러던 차에 발견한 것이 kafka였고 얼핏 보기에 내 목적을 
충족 시켜줄 수 있을 것으로 판단되었다.


간단하게 개념을 익힌 후 기왕에 twitter 처리가 node.js 모듈로 구현
되었기에 kafka역시 node.js 모듈로 붙여보기로 했다. 
kafka 홈페이지의 클라이언트 메뉴에 있는 node.js 항목에서
kafka-node녀석을 사용하기로 했다. 이유는? 문서가 가장 잘 만들어져
있어서였다. 


(kafka-node : https://github.com/SOHU-Co/kafka-node/)


샘플 코드를 보고 구현을 해보았으나 node.js에 대한 지식도 부족한터라
생각처럼 잘 진행이 되지 않았다.


Node.js에서 막히다…

내가 구상했던 것은 당연히 다음의 진행이다.


  1. twitter 데이터 가져오기 
  2. Producer에서 메시지(1번의 twitter 데이터) 전달 
  3. topic에 적재
  4. Consumer에서 메시지 가공

그런데…
Node.js의 비동기 처리가 날 당황스럽게 했다.


우선은 기존의 twitter streaming API 처리를 하던 js에 kafka-node의
producer 샘플을 그대로 추가했다. producer는 비교적 설정이
단순한 HighLebelProducer를 사용하였다.
최초의 코드는 다음과 같이 구현되었다.


var kafka = require('kafka-node');
var Twitter = require('twitter');
var count = 0;


var client = new Twitter({
  consumer_key: '...',
  consumer_secret: '...',
  access_token_key: '...',
  access_token_secret: '...',
});

/**
*  Stream statuses filtered by keyword
*  number of tweets per second depends on topic popularity
**/
client.stream('statuses/sample', {track: ''},  function(stream){
  stream.on('data', function(tweet) {
      if (tweet.lang == 'kr' || tweet.lang == 'ko') {
        var HighLevelProducer = kafka.HighLevelProducer;
        var Client = kafka.Client;
        var client = new Client('NAMENODE.local:2181,\
            SECONDARY-NAMENODE.local:2181,\
            DATANODE1.local:2181,\
            DATANODE2.local:2181,\
            DATANODE3.local:2181');
        var topic = 'twittopic';
        var producer = new HighLevelProducer(client);
        console.log(tweet);
        console.log('===================================' + count);

        sendMessage(producer, tweet, count);

        count++;
      }
    });

    stream.on('error', function(error) {
    console.log(error);
  });
});

var sendMessage = function(producer, message, count) {
  console.log('==sendMessage=========================');

  producer.on('ready', function () {
    console.log('==producer ready=========');
    setInterval(send, 500);
  });

  function send() {
    console.log('send start');
    producer.send([
        {topic: 'twittopic', messages: [message.id] }
      ], function (err, data) {
      if (err) console.log(err);
      else console.log('send %d messages %s', count, message.id);
    });
  }

  producer.on('error', function (err) {
    console.log('error', err)
  });
};


이렇게 구현을 하는 동안 다음의 문제에 막혔다.


  • producer 객체를 언제 생성해야 하는가?

우선은 당연히 var kafka = require('kafka-node'); 이후에 이어서
관련 객체들을 생성해야 한다고 생각해서 그렇게 진행을 했더니
producer가 전혀 작동을 하지 않았다. 왜인지 아직도 원인은 모르겠다.


이후 twitter 데이터를 가져오는 부분이 
‘stream.on('data', function(tweet)’이고, ’data’이벤트가 발생하면 
콜백함수가 실행되고 이 콜백함수의 파라미터로 twitter의 데이터가 
전달된다. 따라서 나는 데이터가 전달되는 이 위치에서 producer 처리를 
해야 한다고 판단했다. 그래서 현재와 같이 구현을 하였고 일단 동작은
하였다.


  • setInterval의 있고 없음의 차이

보면 알겠지만 sendMessage 함수 내에 구현된 producer역시
‘ready’ 이벤트 발생시 콜백함수가 실행되는 구조이다. 그리고
이 안에 실제 메시지를 보내는 함수를 호출하는 코드가 구현되어있다.


그런데 이 구현에서 setInterval을 사용을 하게 되면 twitter의 콜백이
한 번 호출할 때마다 0.5초 동안 send 함수가 수십차례 호출이 되는
전형적인 비동기 호출의 문제에 빠졌다.


앞서 말했듯이 나는 twitter 메시지 한개가 들어오면 producer에서
그 메시지 1번만 보내는 것을 원했다.


다행히 아주 직관적으로 setIntervar을 제거하니 얼추 동기화가 되었다.


  • 58 제약?

일단 setInterval을 제거하고 호출을 하니 동기화는 되었는데
또다른 문제가 발생을 하였다. 58건의 메시지를 보내고 나면 더이상
메시지의 send가 이루어지지 않는 것이었다. 게다가 consumer도
하나만 메시지를 수신하고 partition 0번만 처리를 하고 있는 것이다.


혹시나 해서 setInterval을 다시 넣어보니 이 경우에도 0.5초동안
전송되는 횟수는 58회였다.


뭔지는 모르겠지만 이정도 수치에서 뭔가 제약이 걸리는 것 같은데
이유를 모르겠다(다음 포스팅에 해결책만 적겠다).


어쨌든 여기까지로 twitter로 메시지를 받고 producer로 topic에
쌓고 consumer로 메시지를 소비하는 것까지의 기본 동작은 확인을
했다. 


다음 포스팅에서는 이 소스를 약간 변형시켜서 애초에 구상했던
내용에 근접하는 동작을 구현한 코드를 가지고 이야기를 풀어나가자. 

반응형