Cluster : The Beginning - Apache Kafka와 EMQ 연동


HAProxy, EMQ, Kafka 설치를 통해 일단 입구는 마련이 되었다…고 생각했으나 아직 한 가지 남은 것이 있었다.
단순한 메신저 기능이나 M2M 통신만을 위한 것이 아니라 데이터 분석이 목적이라면 EMQ를 통해 들어온 데이터를 
저장해야 할텐데 아직 그 기능이 구현되지 않았다.


EMQ의 토픽을 영구 저장소에 저장하는 다른 방법이 있는지는 모르겠으나 나의 경우 일단 Kafka가 설치되어있고
곧 Hadoop과 HBase를 설치할 예정이기에(2018년 1월 2일 현재 설치 완료됨) 다음과 같은 경로로 저장을 하기로
했다.




그리 어렵지 않은 내용이니 간단하게 소개하고 마치도록 하겠다.


1차시기 실패 : emqttd_plugin_kafka_bridge


우선 처음 검색해서 찾아낸 것이 emqttd_plugin_kafka_bridge란 놈이었다. 이름으로 알 수 있듯이 EMQ와의
연동을 고려하여 만들어진 것 같은데…어쩐 일인지 빌드가 되지 않았다. 우선은 라즈베리파이라서 그런가보다 하고
넘어 가고 다른 대안을 찾기 시작했다. mqttKafkaBridge와 같은 몇몇 도구들도 찾아보았으나 라즈베리파이에서는
잘 실행이 안되었다 (아래 링크는 시도를 해보았거나 검색 결과로 찾은 도구들이다). 



결국 아래 링크된 페이지를 참조하여 kafka에 있는 connect-standalone.sh를 이용하여 연동하는데 성공하였다.


https://howtoprogram.xyz/2016/07/30/apache-kafka-connect-mqtt-source-tutorial/


connect-standalone.sh 이용하기


우선 나의 경우 위에 링크한 페이지를 참고로 하여 mqtt 연동을 위한 설정은 mqtt.properties 파일에 
아래와 같이 하였다. 이 파일은 $KAFKA_HMOE/config에 위치해야 한다.

# 이 커넥터의 이름과 커넥터로 사용할 클래스 그리고 task의 최댓값을 설정한다.
name=mqtt
connector.class=com.evokly.kafka.connect.mqtt.MqttSourceConnector
tasks.max=1

# kafka쪽에서 사용할 토픽 이름이다.
kafka.topic=mqtt-kafka

# mqtt에 연결할 kafka broker의 아이디이다. 나머지 2대의 kafka broker에는 각각
# mqtt-kafka-2, mqtt-kafka-3으로 설정이 되어있다.
mqtt.client_id=mqtt-kafka-1

# 연결 관련 설정으로 위에 링크한 참조 페이지 내용을 그대로 사용하였다.
mqtt.clean_session=true
mqtt.connection_timeout=30
mqtt.keep_alive_interval=60

# MQTT 관련 설정. mqtt.server_uris는 나의 경우 EMQ를 노드 2개의 클러스터로 구성하였고
# 부하 분산을 위해 HAProxy가 설치된 서버 주소로 설정하였다. 그리고 EMQ에서 사용할 토픽은
# /mqtt로 설정하였다.
mqtt.server_uris=tcp://172.30.1.23:1884
mqtt.topic=/mqtt


이와 같이 설정한 후 connect-standalone.sh를 다음과 같이 실행한다(물론 이미 zookeeper와 kafka는 
실행이 된 상태여야 한다).

$ cd $KAFKA_HOME/bin
$ ./connect-standalone.sh config/connect-standalone.properties config/mqtt.properties


connect-distribute.sh도 있는데 standalone과 어떤 차이가 있는지 잘 모르겠다. 추후 다시 확인을 해봐야겠다.


연동 결과


connect-standalone.sh를 실행하고 나면 잠시 후 EMQ의 웹 화면에 아래와 같이 mqtt.properties에 설정한
mqtt.client_id 값들이 메뉴 곳곳에 보이는 것을 확인할 수 있다.


EMQ에 연결된 클라이언트로 mqtt.properties에 설정한 클라이언트 Id가 보인다.

현재 연결된 세션에도 역시 클라이언트 Id가 보인다.


마찬가지로 구독자 목록에도 클라이언트 Id가 보이며 구독하는 토픽은 mqtt.topic에 설정해준 값인 /mqtt
보인다.


마지막으로 Kafka Manager에서 Topic List메뉴를 보면 mqtt.properties의 kafka.topic 항목에 설정한
mqtt-kafka가 보이는 것을 확인할 수 있다.




정리


앞서 정리한 내용과 마찬가지로 세세한 부분까지 확인하지는 못했지만 우선 Kafka와 EMQ를 연동하는 것 까지
작업을 완료 하였다. 이제 EMQ -> Kafka -> Hadoop(HBase)로 이어지는 프로세스에는 아두이노로 만들
온습도계의 데이터를 저장할 것이고 Kafka Producer -> Kafka -> Hadoop(HBase)로 이어지는 프로세스에는
일반 텍스트 데이터를 저장하게 될 것이다.


이렇게 해서 데이터 컬렉션을 담당할 부분은 모두 설치가 완료가 되었다.
다음 시간부터는 데이터 저장 및 분석을 위한 Hadoop과 HBase 설치에 대한 내용을 정리하고 그 이후 데이터의 
실시간 분석 및 머신 러닝을 위한 Apache Spark 설치에 대한 내용으로 마무리를 하도록 하겠다.


어느덧 해가 바뀌었고 다행히 설치 자체는 순조롭게 진행이 되고 있다.
부디 늦지 않게 데이터가 수집되고 수집한 데이터를 분석하는 것 까지 정리 할 수 있기를 바란다.


이 글을 보고 계시는 분들께 새해 인사 전하면서 이만 줄인다.


무술년 한 해 모두 행복하시기를 바랍니다.
새해 복 많이 받으세요!!! 






블로그 이미지

마즈다

이미 마흔을 넘어섰지만 아직도 꿈을 좇고 있습니다. 그래서 그 꿈에 다가가기 위한 단편들을 하나 둘 씩 모아가고 있지요. 이 곳에 그 단편들이 모일 겁니다...^^








Cluster : The Beginning - Raspberry pi에 Apache Kafka 설치하기


이전 작업으로 일단 기본적인 출발은 마무리가 되었다.
하지만 EMQ를 설치한 내용에서 언급했듯이 MQTT는 경량화 프로토콜로 주로 IoT에 특화되어 있다고 볼 수 있어
(Facebook Messanger에서 MQTT를 사용한다고 하는데 현재형인지 또 어떤 영역에 어떻게 사용하는지는 잘
모르겠다) 아직은 절반의 성공일 뿐이다.


센서 데이터 분석을 위한 환경 뿐만 아니라 일반적인 데이터 분석에 대한 환경을 갖추기 위해서는 MQTT라는 진입점
만으로는 부족한 것이다. 그래서 일반 데이터를 수집하는 부분은 예전에 한 번 시도를 해보았던 Apache Kafka를
이용하기로 했다. Kafka를 이용해 수집할 데이터도 예전과 마찬가지로 트위터의 데이터가 될 것이다. 다만 클라이언트는
예전에는 Node.js를 이용하여 구현했으나 이번에는 다른 방식을 찾아볼 생각이다.


이번 포스팅은 바로 이 Apache Kafka를 라즈베리파이에 설치하고 구동하는 과정을 정리해보겠다.
사실 2016년도에 정리한 내용의 축약 버전이나 다름없어 마지막 정리에 2016년에 포스팅한 내용을 모두
링크하였으니 참고하시길 바란다.


개요 - Kafka는 MQTT와 뭐가 다른가?


일단 Kafka 역시 Message Queue기반의 시스템이다. 용어의 차이는 있지만 대체로 구성이 비슷하다.




아주 심플하고 직관적인(하지만 정확하진 않은…-.-) 비유를 들자면 다른 Message Queue 시스템을 퀵서비스라고
한다면 Kafka는 택배라고 할 수 있을 것이다. 퀵서비스는 작은 물건이나 서류를 송신자로부터 수신자 에게 직접 전달을
해주지만 택배는 큰 덩치의 물건들을 물류창고에 집하했다가 다시 배송을 한다. 하지만 이 것은 어디까지나 간단한 비유고
자세한 차이점은 아래 블로그에 잘 정리가 되어있다.

http://epicdevs.com/17


일단 Kafka는 용량이 큰 데이터 전송에 유리하고 클러스터를 통해 데이터를 ‘복제’해둘 수 있으며, Message Queue가
broker에서 subscriber로 topic을 push해주는 반면 kafka는 consumer가 필요할 때 broker로부터 pull 방식으로
가져다 쓸 수 있다는 차이 정도만 알아두면 될 것 같다.


zookeeper 설치, 설정, 실행


개요에서 설명한 것과 같이 Kafka의 경우 클러스터를 구성하여 분산처리를 할 수 있으며 전송되는 데이터를 여러 노드에
복제해놓을 수도 있다. 하지만 이러한 분산 처리를 하기 위해서는 zookeeper라는 분산 코디네이터의 도움을 받아야
한다. 즉, Kafka를 사용하기 위해서는 zookeeper를 먼저 설치해야 한다는 뜻이다.


라즈베리파이에 zookeeper 설치는 매우 간단해서 그냥 바이너리 배포판을 다운로드 받아 적당한 위치에 압축을
풀고 환경에 맞게 설정한 후 실행을 하면 된다…-.-


나는 일단 3.4.10 버전을 받아서 /opt/zookeeper에 압축을 풀었다.
설정은 딱 3가지만 하면 된다.

#데이터를 저장할 디렉토리를 설정한다.
dataDir=/var/lib/zookeeper

#간단하게 기본 설정 파일에서 주석만 풀어주면 된다. 
#주석 처리되어있으면 서버간 통신 때 connection refused가 발생한다.
maxClientCnxns=60

#zookeeper의 클러스터는 별도로 앙상블이라고 불리우는데 앙상블을 구성할 서버 주소를 적어준다.
server.0=172.30.1.54:2888:3888
server.1=172.30.1.13:2888:3888
server.2=172.30.1.42:2888:3888


zookeeper 앙상블이 정상적으로 실행되기 위해서는 dataDir에 지정된 경로에 myid 파일이 필요하며 이 파일에는
3번째 서버 설정에서 정의된 서버 ID가 적혀있어야 한다. 위 설정을 기준으로 보자면 server.0 서버에는 0이,
server.1 서버에는 1이, server.2 서버에는 2가 적혀 있어야 한다.


앙상블(클러스터)을 구성하는 모든 서버에 동일한 설정을 해주고 나서 각 서버에서 아래와 같이 zookeeper 서버를
실행해준다.

$ $ZOOKEEPER_HOME/bin/zkServer.sh start


한가지 주의할 사항은 클라이언트의 요청을 처리하는데 있어서 leader 역할을 하는 한 대의 노드에서만 읽기와 쓰기가
모두 가능하다. follower에서는 오직 읽기만 처리 가능하며, 만일 쓰기 요청이 오면 각 follower 노드들은 그 요청을
leader 노드에 위임하게 된다.


어느 서버가 leader고 어느 서버가 follower인지는 zookeeper에서 확인 가능한데 다음 명령어로 확인 가능하다. 
아래는 현재 노드가 follower임을 보여준다.




zookeeper에 대한 기본적인 내용은 이 것이 전부다. 하지만 분산 시스템을 관리한다는 본연의 임무를 생각해본다면
zookeeper에 대해 알아야 할 내용은 상당히 많다. 또한 zookeeper API를 이용하면 zookeeper를 통해 관리되는
분산 시스템을 직접 만들 수도 있다. 한마디로 zookeeper에 대한 것만 공부하자고 해도 상당히 많은 시간이 필요하므로
여기서는 이정도로 마무리 하겠다.


Kafka 설치


지금껏 진행해온 다른 시스템 설치와 마찬가지로 설치 자체는 매우 간단하다. 바이너리 배포본을 다운로드 한 후
압축을 풀고, 설정하고, 실행하면 된다.


나의 경우 일단 Kafka는 2.11-1 버전을 다운로드 받았고 /opt/kafka에 압축을 풀었다.




Kafka 설정


Kafka의 설정 파일 위치는 다음과 같다. 나는 /opt/kafka에 설치를 했으니 /opt/kafka/config 아래에 있는
server.properties를 수정하면 된다.




이전과 마찬가지로 반드시 설정해야 할 내용만 정리해보자.

#앞서 zookeeper 설정에서 설명한 서버 아이디를 적어준다. 여기는 172.30.1.54 서버이므로
#server.0=172.30.1.54:2888:3888 설정을 참고하여 0으로 설정하였다.
broker.id=0

#로그를 기록할 경로를 지정한다.
log.dirs=/var/lib/kafka-logs

#topic을 저장할 파티션을 몇개로 할 지 지정한다. 서버가 3대이니 일단 3으로 지정해보았다.
#이렇게 설정하면 하나의 데이터 파일이 3개로 쪼개져서 저장이 된다.데이터 파일이 Topic으로 들어오는 데이터가       #3영개의 영역으로 나뉘어서 저장이 된다.
#하지만 partition이란 하나의 Topic을 몇 개로 나눌지를 결정하는 것이지 #물리적 서버의 댓수와는 상관이 없다. num.partitions=3 #데이터 파일의 복제본을 몇개나 가지고 있을지 설정한다. 3으로 설정했으니 3개의 복제본이 존재하게 된다. offsets.topic.replication.factor=3 #클러스터 관리를 위한 zookeeper 서버 목록을 적는다. zookeeper 설정에서는 IP 주소로 설정했는데 #여기서는 host 이름으로 설정하여 일관성이 없는 설정이 되긴 했지만...-.- #각 서버는 다음과 같이 매핑되니 참고하시길 바란다. #rpi1=172.30.1.54, rpi2=172.30.1.13, rpi3=172.30.1.42 zookeeper.connect=rpi1:2181,rpi2:2181,rpi3:2181


위 내용만 설정하면 kafka 서버를 실행할 수 있다.


마지막으로 num.partitions과 offsets.topic.replication.factor 설정이 어떻게 반영되는지에 대해 아래와 같이
그림으로 간단하게 표현을 할 수 있다.




위 그림을 설명하자면 다음과 같은 구조의 경우라 볼 수 있다.

  • 3개의 노드 
  • 3개의 파티션(num.partitions=3)
  • 3개의 복제본(offsets.topic.replication.factor=3)


여기서 주의해서 볼 것은 leader와 follower로 항상 leader에만 쓰기가 가능하며 leader에 데이터가 기록되면
기록된 데이터가 follower로 복제된다는 것이다.


보다 상세한 내용은 좀 더 공부를 한 후 다시 정리해보도록 하겠다.


실행


실행은 /opt/kafka/bin경로로 이동하여 다음과 같이 입력하면 된다.

$ ./kafka-server-start.sh ../config/server.properties


만일 JMX를 이용한 모니터링 도구를 이용하고자 한다면 다음과 같이 실행한다.

$ env JMX_PORT=9000 ./kafka-server-start.sh ../config/server.properties


이렇게 실행을 한 후 아래 2개의 모니터링 도구를 사용할 수 있다.

Kafka Offset Monitor : http://quantifind.github.io/KafkaOffsetMonitor/

Kafka Manager : https://github.com/yahoo/kafka-manager


주의 사항


2016년도에 Kafka를 설치할 때는 맥미니에 설치하였지만 이번에는 라즈베리파이에 설치를 하였다.
이 차이는 결코 작은 차이가 아니다. 하드웨어 사양으로 인해 라즈베리파이에서는 사용할 수 있는 자원이
매우 한정적인 것이다. 실제로 라즈베리파이에서 kafka를 실행했을 때 잦은 빈도로 메모리 부족 현상이
발생을 하였다. 따라 일단 라즈베리파이에서 안정적으로 실행을 하기 위해서는 kafka-server-start.sh 파일의
다음 부분을 찾아 Xmx와 Xms를 적절히 수정해주어야 한다. 기본 값은 1G이다.

export KAFKA_HEAP_OPTS="-Xmx512M -Xms512M"



정리


사실 라즈베리파이에서 설치 및 구동을 한 것 외에는 2016년도에 정리했던 내용과 별 다른 점이 없다.
다시 정리한다는 것이 중복된 작업일 수도 있으나 복습한다는 의미로 한 번 더 정리해보았다.
그런 의미에서 가장 중요한 부분은 ‘주의 사항’이 아닐까 싶다.


가장 기초적인 설치,설정,실행 부분만 짚고 넘어갔으니 2016년도의 Kafka 관련 모든 글을 링크하면서
이번 포스팅을 마칠까 한다.











블로그 이미지

마즈다

이미 마흔을 넘어섰지만 아직도 꿈을 좇고 있습니다. 그래서 그 꿈에 다가가기 위한 단편들을 하나 둘 씩 모아가고 있지요. 이 곳에 그 단편들이 모일 겁니다...^^


Kafka 정리를 마치며


분산 시스템 관리의 어려움

얼추 node 모듈을 이용한 kafka 서비스가 구현이 된 것 같았다.
트위터 Streaming API를 이용하여 데이터를 잘 가져오고,
producer는 이 데이터를 broker에게 잘 전달하고,
consumer는 broker로부터 데이터를 잘 가져와 로그를 뿌려주고…


하지만 어느 순간 이러한 프로세스가 중지되어있기 일쑤였다.
zookeeper쪽이나 kafka쪽이나 서버 콘솔에 출력되는 로그는
대체로 네트워크가 끊겼다는 메시지인데 도대체 이 문제가 어떤 원인으로
발생하는 지를 알 수가 없는 것이다.


애초에 분산 시스템에서 장애의 원인을 찾는 것은 매우 어려운 일이라는 것은
알고 있었지만 아무리 작은 클러스터라도 이 문제를 직접 겪으니 참
답이 안나온다.(물론 나의 경험과 지식의 부족이 가장 큰 역할을 했겠지만…ㅠ.ㅠ)


물리적인 네트워크가 문제인지, zookeeper에서 문제가 발생한 것인지
kafka에서 문제가 발생한 것인지…게다가 zookeeper와 kafka의 장애에 대한
상세한 자료들은 찾기가 쉽지 않아서…


결국 이 문제로 거의 2주 가량을 별다른 진척 없이 zookeeper와 kafka의
에러 로그에 대한 구글링만 하면서 보냈다.


혹시나 해서 몇대 안되는 클러스터에서 잔뜩 돌아가고 있던 HBase와
Storm 서버들도 모두 죽여버렸다.

최종적으로 구성된 나의 허접한 클러스터는 아래와 같다.



<가난한 자의 클러스터 이미지>


황당한 원인과 새로운 문제


사실 문제는 너무나 명백한 곳에 있었다.
현재 총 5대의 맥미니로 구성된 클러스터에서 4대는 서버 전용으로만
사용했으나 1대를 일반 가정용 용도로도 사용을 하는 과정에서 서버 전용의
4대는 절전 모드를 꺼놓았는데 이 한 대에 대해서는 절전모드를 켜놓은
상태였던 것이다. 그러니 절전모드로 들어가면서 이 한대에서 돌고있던
zookeeper, kafka는 물론 여기서 돌고 있던 node.js의 producer 모듈까지
모두 맛이 가버린 것이다. 


결국 전기세의 압박에도 불구하고 절전모드를 모두 해제하고 상황을 지켜보았더니
트위터 메시지 건수 기준으로 기존에 4~5천 건에서 죽던 프로세스가
대략 7만 건 이상을 처리할 수 있게 되었다.


하지만 아직도 갈 길이 먼 것이 약 7만 건 정도 처리를 하고 나면 zookeeper에서
Purge task가 발생을 하는데 이 시점에서 producer 프로세스가 멈춰버린다.
zookeeper의 purse 관련 설정에 대해 알아보고는 있으나 역시 자료도 많지 않고
나의 무식은 큰 장애가 되고 있고…ㅠ.ㅠㅠ


일단 다음 단계로


내가 하고자 하는 것은 트위터 데이터를 모아 형태소 분석을 거쳐 특정 시점에
가장 많이 언급된 단어들을 추출하고 그 단어에 대한 긍정/부정의 평가를 한 후
다시 그 단어가 언급된 공인 미디어를 검색하여 긍정/부정 평가에 대한
공신력을 추가하는 작업이다.


그 중에 이제 데이터 수집 단계를 진행하고 있으며 기왕이면 공부좀 해보자고
kafka에 손을 대본 것인데 역시 한계가 있다. 하지만 목표한 바를 진행하면서
최대한 틀린 부분을 바로 잡고 몰랐던 것을 채워 나가야겠다.


당장에 진행할 다음 단계는 현재 일없이 로그만 찍어대고 있는 consumer에
제대로 된 역할, 즉 Hadoop으로 파일을 저장하는 일을 좀 시키려고 한다.
역시 node 모듈을 사용할 것이고 그 과정 또한 차근차근 정리해 볼 생각이다.


선무당이 사람 잡는다.

잘 모르는 내용을 억지로 진행하다보니 잘못된 정보를 기록하게 되는 경우도
많은 것 같다. 앞으로는 가급적 핵심적인 내용은 잘 정리된 외부 사이트를
인용을 하고 내가 실제 눈으로 본 것들을 중심으로 정리를 해야겠다.

블로그 이미지

마즈다

이미 마흔을 넘어섰지만 아직도 꿈을 좇고 있습니다. 그래서 그 꿈에 다가가기 위한 단편들을 하나 둘 씩 모아가고 있지요. 이 곳에 그 단편들이 모일 겁니다...^^



소스 정리와 모니터링 툴


소스 정리

일단 급하게 기능을 확인하다보니 소스 코드가 엉망이다.
조금이나마 다듬어야 보기가 편할 것 같아 쉬어갈 겸 
우선 node 소스들을 정리했다.


tmgetter.js 
(트위터 메시지를 받아서 콜백 함수를 통해 topic으로 메시지를
보내는 모듈)

var Twitter = require('twitter');
var count = 0;


var client = new Twitter({
  consumer_key: '...',
  consumer_secret: '...',
  access_token_key: '...',
  access_token_secret: '...',
});

var msgArr = new Array();

/**
*  parameter
* msgCount : kafka 토픽으로 보낼 메시지 배열의 수
* sendMessage : kafka 토픽으로 메시지를 보낼 콜백 함
**/
var getTwitterMessage = function(kafka, msgCount, sendMessage) {
   /**
   * Stream statuses filtered by keyword
   * number of tweets per second depends on topic popularity
   **/
   client.stream('statuses/sample', {track: ''},  function(stream){
      stream.on('data', function(tweet) {
        if (tweet.lang == 'kr' || tweet.lang == 'ko') {
          if (count % msgCount == 0) {
            // 메시지 배열에 값이 들어있는 경우 초기화 시
            if (msgArr.length > 0) {
              while(msgArr.length > 0) {
                msgArr.pop();
              }
            }
          }

          var messageStr = extractMessage(tweet);

          msgArr.push(messageStr);

          if ((count % msgCount) == (msgCount - 1)) {        
            if (typeof sendMessage === 'function') {
              var Producer = kafka.Producer;
              var Client = kafka.Client;
              var client = new Client('NAMENODE.local:2181,SECONDARY-NAMENODE.local:2181,DATANODE1.local:2181,DATANODE2.local:2181,DATANODE3.local:2181');

              var topic = 'twittopic';
              var producer = new Producer(client, { requireAcks: 1 });

              sendMessage(producer, msgArr, count);
            }
          }
          count++;
        }
      });

      stream.on('error', function(error) {
        console.log(error);
      });
    });

}

// 트위터 메시지에서 필요한 부분만 추출하는 함수
var extractMessage = function(tweet) {
  var messageObj = {};

  messageObj.createdAt = tweet.created_at;
  messageObj.id = tweet.id;
  messageObj.idStr = tweet.id_str;
  messageObj.text = tweet.text;
  messageObj.UserScreenName = tweet.user.screen_name;
  
  messageObj.retweetCreatedAt = '';
  messageObj.retweetId = '';
  messageObj.retweetIdStr = '';
  messageObj.retweetText = '';
  messageObj.retweetUserScreenName = '';
  messageObj.retweetCount = '';
  
  if (tweet.retweeted_status) {
    messageObj.retweetCreatedAt = tweet.retweeted_status.created_at;
    messageObj.retweetId = tweet.retweeted_status.id;
    messageObj.retweetIdStr = tweet.retweeted_status.id_str;
    messageObj.retweetText = tweet.retweeted_status.text;
    messageObj.retweetUserScreenName = tweet.retweeted_status.user.screen_name;
    messageObj.retweetCount = tweet.retweeted_status.retweet_count;
  }
  
  return JSON.stringify(messageObj)
}

exports.getTwitterMessage = getTwitterMessage;

sender.js
(트위터 메시지를 topic에 전달할 모듈)

var sendMessage = function(producer, msgArr, count) {

  console.log('==start sendMessage=================================================================');

  producer.on('ready', function () {
    console.log('-> producer ready');

    var p = count % 3;
    producer.send([
        {topic: 'twittopic', partition: p, messages: msgArr, attributes: 0 }
      ], function (err, data) {
          if (err) {
            console.log('-> Error : ' + err);
          }
          else {
            console.log('-> send %d messages', count);
            console.log('-> send to %d Partition', p);
          }

          producer.close(function() {
            console.log('-> close produce');
            console.log('==end sendMessage===================================================================');
          });
    });
  });

  producer.on('error', function (err) {
    console.log('error', err)
  });
};

exports.sendMessage = sendMessage;

index.js
(실행 모듈)

var kafka = require('kafka-node');

var tmgetter = require('./tmgetter');
var sender = require('./sender');
// producer를 별도의 모듈로 만들고 오브젝트를 생성하여 사용하고자 했으나
// 이렇게 하면 무슨 이유인지 producer가 작동하지 않는다.
//  producer의 ready 이벤트가 어떤 조건에서 발생하는지 모르겠다...ㅠ.ㅠ
//var producer = require('./producer');


tmgetter.getTwitterMessage(kafka, 20, sender.sendMessage);

일단 이렇게 소스는 조금 정리를 했다.
Producer쪽 로그도 조금 깔끔하게 다듬었다.



모니터링 툴

Kafka Manager


비교적 다양한 정보를 볼 수 있는 장점이 있다. 물론 그만큼 설정이
복잡하다. 물론 기본적인 설정만으로도 웬만한 정보는 확인 가능하며
지난 포스팅에 언급한 것 처럼 JMX 기반으로 정보를 확인하고자 하면
kafka 서버 실행시 다음과 같이 JMX 포트를 명시해주어야 한다.


env JMX_PORT=9999 ./kafka-server-start.sh ../config/server.properties


설치 및 실행은 지난 포스팅에 링크한 github 페이지를 참고하도록 하고
오늘은 화면 설명만 간단하게 하겠다.


클러스터 설정 화면


일단 가장 기본적인 Cluster Name을 정해주고
Cluster Zookeeper Hosts설정을 해준다. 제목대로 Zookeeper 노드의 
주소와 포트를 넣어주면 된다. 다음 Kafka Version을 선택하고 마지막으로 
JMX 관련 설정을 하면 기본적인 설정은 끝이다. 나머지 설정들은 기본값으로…
설정이 끝났으면 화면 맨 아래의 SAVE 버튼을 누르자.


설정을 저장하고나서 상단 Cluster메뉴에서 List를 선택하면
아주 심플한 화면에 방금 설정을 마친 클러스터 이름이 보인다.

이 클러스터 이름을 클릭하면 다음과 같은 화면이 나온다.


여전히 화면은 심플하지만 상단에 뭔가 메뉴가 잔뜩 생겼다.
사실 상단 메뉴의 대부분은 뭔가를 생성하거나 변경하는 내용들이라
아직 직접 사용해 보지는 않았다. 이제 친숙한 용어들을 눌러보자


예상했듯이 topic 목록이 나온다. 나는 토픽을 1개만 생성을 했기 때문에
하나의 항목이 보이고 보다시피 topic 이름은 ‘twittopic’이다.
내용을 보면 알겠지만 3개의 partition이 있고 5개의 broker가 구동중이며
3개의 복제본이 있다. 다른 항목들은 그냥 보면 되겠는데 
Brokers Skew %라는 것이 뭔지 도통 모르겠다.
topic 이름을 클릭해보자


뭔가 조금 복잡한 화면이 나왔지만 살펴보연 목록에 나왔던 내용들의

반복에 특정 시간 간격에서의 전송량 등 쉽게 알 수 있는 내용들이
표시된다. 특히 Partitions by Broker를 보면 topic의 각 partition이
전체 broker에 어떻게 복제가 되어있는지 알 수 있다.
여기도 역시 Skewed라는 항목이 있는데 skewed의 사전적 의미에
‘왜곡된’, ‘편향된’ 이라는 의미가 있고 Skewed가 id 2번의 브로커만
true이면서 3개의 partition의 복제본이 모두 존재하는 상황으로
미루어보건데 partition의 복제본이 정도 이상으로 집중된 상황을
표시하는 것 같다. 만일 복제본을 5개로 설정했다면 전체 Broker가
모두 Skewed 값이 true가 되고 Broker Skewed %도 100%가
되지 않알까 싶다. 요건 추후 테스트해봐야겠다.


만일 JMX를 사용하지 않게 되면 Metrics 영역에 보이는 회색 타원
영역의 값들이 표시되지 않는다.


Consumers consuming from this topic는 Consumer 그룹을
보여준다. 나는 하나의 컨슈머 그룹만 설정을 했고 그 이름을
kafka-node-group으로 했기에 하나만 보여지고 있다. 한 번 눌러보자


현재 topic partition이 3개이고 consumer가 하나의 그룹에 3개의
인스턴스로 존재하기 때문에 각각의 partition과 consumer는 1:1로
연결이 된다.


다시 돌아가서 이번에는 Broker를 한 번 확인해보자. 현재 3개의 
복제본이 모두 저장되어있는 2번 Broker를 클릭해보겠다.


역시 조금 복잡해보이지만 앞의 정보들이 반복해서 보여진다.
처음으로 그럴듯한 챠트가 하나 나왔다. 시간 흐름에 따라 처리되는
메시지의 양을 보여주는 그래프다. Per Topic Detail영역을 보면
토픽에 대한 정보를 볼 수 있는데 앞의 메뉴에서 이미 확인된 내용들이다.


이렇게 뭔가 화면이 많고 복잡해 보이기는 하나 대부분 중복된
내용이 많다. 그리고 JMX를 이용하지 않을 경우 처리량에 해당하는 정보
(회색 타원 영역과 그래프 등)는 볼 수 없다.


Kafka Consumer Offset Monitor


이 툴은 이름에서도 알 수 있듯이 Cunsumer가 message를 소비하는데
초점이 맞춰져있다. 굉장히 심플하다. 별다른 설정도 필요 없다.
메인 메뉴는 다음과 같다.


Consumer Groups를 선택한 화면이다.
앞서 언급한 것처럼 나는 kafka-node-group이라는 하나의 그룹만을
가지고 있다.


kafka-node-group을 눌러보자



아주 깔끔하게 Broker 목록과 Consumer의 message 소비 상황이보인다.


전체 Broker는 5대의 node(Mac mini)에서 실행되고 있지만 
Consumer는 2, 3, 4번 Broker에서만 실행되고 있기 때문에 Broker
목록에 2, 3, 4번만 보인다.


Consumer Offsets 영역에는 partition 번호와 소비한 message
offset과 size 등의 정보가 보여진다. 그리고 topic 이름인 twittopic을
클릭하면 아래와 같은 그래프가 표시된다.



zoom에서 선택한 시간 동안의 message 소비의 변화량을 보여준다.


메인 메뉴의 2번째 메뉴인 Topic List 메뉴도 결국에는 이 화면으로 
들어오게 된다.


마지막의 Visualizations 메뉴는 클러스터의 구조를 트리 형태로
보여주는 메뉴이며 다음 2개의 화면을 제공한다.


이상과 같이 이번 포스팅에서는 간단하게 2개의 kafka 모니터링 툴에 대해
알아보았다. 아직 잘 모르는 항목들과 기능 버튼들이 있는데 이런 부분들은
추후 확인되는대로 내용을 보완해나가도록 하겠다.


블로그 이미지

마즈다

이미 마흔을 넘어섰지만 아직도 꿈을 좇고 있습니다. 그래서 그 꿈에 다가가기 위한 단편들을 하나 둘 씩 모아가고 있지요. 이 곳에 그 단편들이 모일 겁니다...^^



Kafka 무작정 실행하기 - 2


58의 비밀

먼저 지난 번 마지막에 언급했던 58이란 숫자의 비밀을 밝혀보자.
사실 정확한 원인은 아직 확인 못했다. 다만 지난 번 코드의 구현이 ’트위터
메시지가 하나 들어올 때마다 producer 하나를 만들어 트위터 메시지를
topic에 보낸다’
는 것이었다.


이 과정에서 의심할 수 있는 것은 매번 producer를 만들어 커넥션울 하게 되니
아마도 이 커넥션 수에 제한이 걸려버린 것이 아닐까 하는 부분이었다.


그래서 일단 직감에 의존해 producer에서 topic으로 메시지를 보낸 후
API의 close 함수로 연결을 끊어보았다. 예상이 적중하였는지 이후로는
58개의 제한에 걸리지 않고 트위터에서 받아오는 모든 메시지들이
정상적으로 전송이 되었다.



성능 관리

겨우 이만큼의 구현과 설정을 해놓고 성능 관리를 논하는 것은 우스운 일이지만
일단 트위터 메시지 한 건에 커넥션 하나를 만들었다가 끊는다는 것은 아무리
생각해도 그리 바람직하지는 않다고 판단하였다. 


그리서 우선 아쉬운대로 트위터 메시지 10건이 수신되면 그 10개를 하나의
배열로 하여 producer를 통해 보내도록 하였다. 우선은 시험 삼아
이정도 처리를 한 것이고 시스템 환경에 따라 아마도 이러한 수치를 세밀하게

조절을 해야 할 것이다.


그리고 실제로 topic으로 메시지를 보내는 위치에서 파티션(나는 3개의 
partition 나뉜 1개의 topic을 만들었다)에 고르게 분산시키기 위해
파티션의 인덱스를 계산하는 식을 하나 추가하였다.


  • 처음에는 손이 덜가는 HighLevelProducer를 사용했었으나 뭔가 잘
    작동하지 않는 듯하여 그냥 Producer로 바꾸었다.

수정된 소스

일단 지난 시간의 문제점을 수정하고 몇가지 필요한 내용을 추가한
소스를 다음과 같이 만들었다.

var kafka = require('kafka-node');


var Twitter = require('twitter');
var count = 0;


var client = new Twitter({
  consumer_key: ‘…’,
  consumer_secret: ‘…’,
  access_token_key: ‘…’,
  access_token_secret: ‘…’,
});

var msgArr;

/**
 * Stream statuses filtered by keyword
 * number of tweets per second depends on topic popularity
 **/
client.stream('statuses/sample', {track: ''},  function(stream){
  stream.on('data', function(tweet) {
    if (tweet.lang == 'kr' || tweet.lang == 'ko') {
      if (count % 10 == 0) {
        msgArr = new Array();
      }

      var messageObj = {};

      messageObj.createdAt = tweet.created_at;
      messageObj.id = tweet.id;
      messageObj.idStr = tweet.id_str;
      messageObj.text = tweet.text;
      messageObj.UserScreenName = tweet.user.screen_name;

      messageObj.retweetCreatedAt = '';
      messageObj.retweetId = '';
      messageObj.retweetIdStr = '';
      messageObj.retweetText = '';
      messageObj.retweetUserScreenName = '';
      messageObj.retweetCount = '';

      if (tweet.retweeted_status) {
        messageObj.retweetCreatedAt = tweet.retweeted_status.created_at;
        messageObj.retweetId = tweet.retweeted_status.id;
        messageObj.retweetIdStr = tweet.retweeted_status.id_str;
        messageObj.retweetText = tweet.retweeted_status.text;
        messageObj.retweetUserScreenName = tweet.retweeted_status.user.screen_name;
        messageObj.retweetCount = tweet.retweeted_status.retweet_count;
      }

      var messageStr = JSON.stringify(messageObj)
      console.log(messageStr);
      console.log('===================================================================' + count);

      msgArr.push(messageStr);

      if (count % 10 == 9) {
        console.log('==call sendMessage====================================================');
        var Producer = kafka.Producer;
        var Client = kafka.Client;
        var client = new Client('NAMENODE.local:2181,SECONDARY-NAMENODE.local:2181,DATANODE1.local:2181,DATANODE2.local:2181,DATANODE3.local:2181');

        var topic = 'twittopic';
        var producer = new Producer(client, { requireAcks: 1 });

        sendMessage(producer, msgArr, count);
      }
      count++;
    }
  });

  stream.on('error', function(error) {
    console.log(error);
  });
});

var sendMessage = function(producer, msgArr, count) {

  console.log('==sendMessage=================================================================');

  producer.on('ready', function () {
    console.log('==producer ready=========');
    console.log('send start');

    var p = (count - 9) % 3;
    producer.send([
      {topic: 'twittopic', partition: p, messages: msgArr, attributes: 0 }
    ], function (err, data) {
      if (err) console.log(err);
      else console.log('send %d messages', count);

      producer.close(function() {
        console.log('close produce');
      });
    });
  });


  producer.on('error', function (err) {
    console.log('error', err)
  });
};

모니터링

일단 내용 구현을 하고 실제 작동하는 것 까지 확인을 하였으니 이제제대로
뭔가를 하고 있는 것인지 궁금해졌다. 일단 Kafka 자체적으로는 달리 웹으로 
구현된 관리 화면을 제공하지 않기에 잘 알려진 모니터링 도구 2가지를 
설치해보았다.


Kafka Offset Monitor : http://quantifind.github.io/KafkaOffsetMonitor/

Kafka Manager : https://github.com/yahoo/kafka-manager


이중 Kafka Manager는 Kafka에서 제공하는 JMX 인터페이스를 이용하는
툴로 이 툴을 이용하기위해서는 Kafka 서버 시작 시 JMX 포트를 함께 지정해
주어야 한다.

env JMX_PORT=9999 ./kafka-server-start.sh ../config/server.properties


다음은 각 모니터링 툴의 실제 화면이다.

  • Kafka Offset Monitor


  • Kafka Manager

그리고 다음은 node.js를 통해 실행한 Producer와 Consumer의 콘솔 화면이다.

아직 Consumer에서는 별다른 메시지 가공은 없이 topic에서 가져온 메시지를
로그로 출력만 하고 있다.


ProducerConsumer1 (DATANODE1)

Consumer2(DATANODE2)Consumer3 (DATANODE3)

현재 구성이 topic이 3개의 partition으로 되어있고 3개의 Consumer가 하나의
Consumer 그룹(kaka-node-group : kafka-node에서 자동 생성한 그룹)에
3개의 Consumer가 있기 때문에 각각의 partition과 Consumer는 다음과 같이
1:1로 연결이 되고 있다.


Consumer1 - partition 0
Consumer2 - partition 2
Consumer3 - partition 1


다음 포스팅에서는 현재 내가 구성하고 있는 시스템의 구조를 설명하고
Consumer 중단 시에 rebalancing이 일어나는 모습을 코솔 로그를 통해
확인해보겠다.

블로그 이미지

마즈다

이미 마흔을 넘어섰지만 아직도 꿈을 좇고 있습니다. 그래서 그 꿈에 다가가기 위한 단편들을 하나 둘 씩 모아가고 있지요. 이 곳에 그 단편들이 모일 겁니다...^^

Distribution

topic의 partition들은 Kafka 클러스터를 구성하는 서버들에 분산 저장이 된다.
partition들은 내고장성을 위해 여러 서버에 복제되며 복제되는 서버의 수를 설정
할 수 있다.


각 partition은 1대의 leader 서버와 0대 이상의 follower 서버들로 구성된다.

(즉, leader 서버 1대로도 Kafka 사용이 가능하다.)

leader는 읽기 쓰기가 모두 가능하고 follower들은 leader의 데이터를 복제한다.


만일 leader가 고장나면 follower 중 한 대를 leader로 선출한다.
이런 구조로 클러스터 내에서의 부하가 적절히 분산된다.


지난 포스팅 (Apache kafka 시작하기)에서의 기억을 더듬어보면
일단 클러스터 내의 복제 서버는 총 5대로 설정되어있다. zookeeper의
server.0 ~ server.4 설정과 Kafka server.properties에서의
zookeeper.connect 설정에 맥미니 5대의 host name이 설정된 것을
확인 할 수 있다.


그리고 이 상태에서 다음과 같이 토픽을 생성할 때 처음에 오류가 발생을 했었다.

SECONDARY-NAMENODE:bin mazdah$ ./kafka-topics.sh --create --zookeeper SECONDARY-NAMENODE.local:2181 --replication-factor 5 --partition 20 --topic test 


원인은 SECONDARY-NAMENODE.local은 leader가 아니었고 그렇기 때문에
복제만 가능하고 write가 불가능했던 것이다. 
결국 leader인 NAMENODE.local을 이용하여 topic 생성에 성공했다.


그리고 이렇게 생성된 topic은 5대의 서버의 로그 디렉토리 
(server.propertiesd에 log.dirs로 설정된 위치 - 나의 경우 /tmp/kafka-logs)에
다음과 같이 뭔가 생성된 것을 확인할 수 있었다.



Producer

Producer는 선택한 topic으로 데이터를 발행한다.
Producer는 메시지를 발행할 topic을 선택하는 책임을 지는데
전통적인 round-robin 방식을 이용하여 균등하게 발행을 하거나
아니면 메시지의 어떤 키를 이용하여 특정 partition에는 특정 키를 가진
메시지만을 보낼 수 있다.

블로그 이미지

마즈다

이미 마흔을 넘어섰지만 아직도 꿈을 좇고 있습니다. 그래서 그 꿈에 다가가기 위한 단편들을 하나 둘 씩 모아가고 있지요. 이 곳에 그 단편들이 모일 겁니다...^^

Apache Kafka 개요


지난 글에서 kafka설치 및 설정, 그리고 서버 기동과 간단한 테스트를
진행해 보았다.


오늘은 kafka의 소개 내용을 간단하게 요약해보겠다.
이미 많은 블로그에 원문에 대한 번역에서부터 심층 분석까지 다양한
자료들이 포스팅 되어있으니 나는 그냥 개요만 짚어보련다~


kafka 공식 홈페이지의 indroduction을 보면 다음과 같은 내용으로 시작한다.

Kafka is a distributed, partitioned, replicated commit log service. It provides the functionality of a messaging system, but with a unique design.


뭐 거의 동어 반복이다. 로그 저장에 대한 분산과 분할과 복제. 그리고 메시징 시스템인데
독특하게 디자인 되었단다. 일단 분산, 분할, 복제에 돼지꼬리 땡땡 하고...


조금 더 구체적으로 가보자

  • 메시지는 topic이라고 불리는 일종의 카테고리를 통해 관리된다.
  • producer를 통해 Kafka의 topic으로 메시지를 발행한다.
  • consumer를 통해 topic으로부터 생성된 메시지들을 구독한다.
  • Kafka는 broker라고 불리는 하나 이상의 서버 클러스터로 실행된다.

Kafka 구성1

지난 번 포스팅의 테스트 과정과 비교하면서 살펴보자
  • “kafka-topics.sh --create…”를 통해서 test라는 이름의 topic을 생성.
  • “kafka-console-producer.sh --broker-list…”를 통해 메시지를 생성.
  • “kafka-console-consumer.sh --zookeeper…”를 통해 메시지를 확인.

broker에 대해서는 2번째 kafka-console-producer.sh 진행 시 --broker-list라는
옵션을 통해 메시지를 broker로 전달했다는 것을 알 수 있다.


이런 과정들 (클라이언트와 서버간의 통신)는 단순하고, 고성능이며 언어에 구애받지 않는
TCP 프로토콜을 통해 이루어진다.


또한 Kafka에서는 기본적으로 java 클라이언트를 제공하지만 다양한 언어들을 이용하여
클라이언트를 구현할 수 있다. (다음 링크에서 사용 가능한 클라이언트 언어를 확인할 수
있다 : https://cwiki.apache.org/confluence/display/KAFKA/Clients )


각각의 개념에 대해 좀 더 상세하게 알아보자


Topic과 로그

토픽은 Kafka 클러스터에 의해 partition으로 분할되어 관리된다. 
아래 그림과 같음)


토픽 개념도2


각 파티션에는 메시지들이 순차적으로 쌓인다.
이 메시지들은 정렬되어 저장되며 각각을 구분할 수 있는 offset이라는 ID가 
부여된다.


이렇게 쌓인 메시지들은 사용되었는지 여부와 상관 없이 일정 시간동안 유지된다.
이런 기능들은 데이터 크기를 효율적으로 사용할 수 있게 해준다.

유지 시간은 Kafka설정에이 있으며 기본값은 168(7일)시간이다
설정 항목은 log.retention.hours이다.


각각의 consumer에도 offset이라고 하는 메타데이터가 있어 메시지의
offset과 관련하여 작동한다.


partition 분할을 통해 데이터 용량의 확장과 병렬 처리의 잇점을 얻을 수 있다.

  1. 이미지 출처 : http://kafka.apache.org/documentation.html#introduction ↩︎
  2. 이미지 출처 : http://kafka.apache.org/documentation.html#introduction ↩︎


블로그 이미지

마즈다

이미 마흔을 넘어섰지만 아직도 꿈을 좇고 있습니다. 그래서 그 꿈에 다가가기 위한 단편들을 하나 둘 씩 모아가고 있지요. 이 곳에 그 단편들이 모일 겁니다...^^


이제야 발견한 Kafka


하던 일도 제대로 못하면서 빅데이터 공부해보겠다고 꼴깝을 떤 것이 
벌써 2013년 5월달 이야기네…


뭔가 새로운 것에 대해서는 남들 앞에서 한 마디나마 
거들 수 있어야 하지 않을까 하는 밑도 끝도 없는 초조감이
나를 뻘짓거리의 함정으로 이끌었다.


뭔가를 시작하기 전에는 지름신을 영접하는 것이 당연한(?) 의례인지라
이 때도 지름신을 조금 과하게(…ㅠ.ㅠ) 영접했다.


맥미니 5대…


그리고는 한 동안은 신났다.
Hadoop 설치하고 zookeeper 설치하고 Hbase 설치하고…
그리고…샘플 한 번 돌려보고? 끝이었나?…ㅠ.ㅠ


목표로 삼았던 것이 twitter의 데이터를 수집해서 이것 저것 분석하는
공부를 좀 해보고자 했는데…
이게 당최 감이 안잡히는 것이다.


twitter API를 통해 데이터를 가져오는 것이야 알겠는데…
이걸 어떻게 저장을 해야하는지…


처음에는 그냥 REST API를 이용해서 데이터를 가져오고
무작정 일반 txt 파일로 저장을 했다.
그러다가 Streaming API를 이용하게 되었는데 아무리 샘플 데이터라
하더라도 연속적으로 들어오는 데이터를 어떻게 저장을 해야 할지,
그리고 간단하게나마 가공을 하고자 하는데 가공을 하는 중 계속해서
들어오는 데이터는 어떻게 처리를 해야 하는지에 대한 해결책을 
전혀 몰랐다.



그렇게 3년이 지나버린 것이다.
그리고 우연히 kafka를 발견한 것이다.
프란츠 카프카도 아니고 해변의 카프카도 아니고 apache Kafka!


일단 2가지 측면에서 관심을 가지게 되었는데 하나는 Use Case 중 
Stream Processing이고 다른 하나는 Client에 node.js가 있다는 점이었다.


처음부터 twitter API를 이용하는데 node module을 사용한지라 계속해서
node module을 사용할 수있다는 것은 나에게는 큰 매리트였다.



아직은 대충 훑어본 정도라 과연 나의 목적에 딱 맞는지는 잘 모르겠으나
이 부분은 계속 적용을 해 나가면서 구체적으로 정리를 해야겠다.


당분간 주말에 할 것이 생겨서 즐겁네

블로그 이미지

마즈다

이미 마흔을 넘어섰지만 아직도 꿈을 좇고 있습니다. 그래서 그 꿈에 다가가기 위한 단편들을 하나 둘 씩 모아가고 있지요. 이 곳에 그 단편들이 모일 겁니다...^^

티스토리 툴바