Cluster : The Beginning - Raspberry pi에 Apache Kafka 설치하기


이전 작업으로 일단 기본적인 출발은 마무리가 되었다.
하지만 EMQ를 설치한 내용에서 언급했듯이 MQTT는 경량화 프로토콜로 주로 IoT에 특화되어 있다고 볼 수 있어
(Facebook Messanger에서 MQTT를 사용한다고 하는데 현재형인지 또 어떤 영역에 어떻게 사용하는지는 잘
모르겠다) 아직은 절반의 성공일 뿐이다.


센서 데이터 분석을 위한 환경 뿐만 아니라 일반적인 데이터 분석에 대한 환경을 갖추기 위해서는 MQTT라는 진입점
만으로는 부족한 것이다. 그래서 일반 데이터를 수집하는 부분은 예전에 한 번 시도를 해보았던 Apache Kafka를
이용하기로 했다. Kafka를 이용해 수집할 데이터도 예전과 마찬가지로 트위터의 데이터가 될 것이다. 다만 클라이언트는
예전에는 Node.js를 이용하여 구현했으나 이번에는 다른 방식을 찾아볼 생각이다.


이번 포스팅은 바로 이 Apache Kafka를 라즈베리파이에 설치하고 구동하는 과정을 정리해보겠다.
사실 2016년도에 정리한 내용의 축약 버전이나 다름없어 마지막 정리에 2016년에 포스팅한 내용을 모두
링크하였으니 참고하시길 바란다.


개요 - Kafka는 MQTT와 뭐가 다른가?


일단 Kafka 역시 Message Queue기반의 시스템이다. 용어의 차이는 있지만 대체로 구성이 비슷하다.




아주 심플하고 직관적인(하지만 정확하진 않은…-.-) 비유를 들자면 다른 Message Queue 시스템을 퀵서비스라고
한다면 Kafka는 택배라고 할 수 있을 것이다. 퀵서비스는 작은 물건이나 서류를 송신자로부터 수신자 에게 직접 전달을
해주지만 택배는 큰 덩치의 물건들을 물류창고에 집하했다가 다시 배송을 한다. 하지만 이 것은 어디까지나 간단한 비유고
자세한 차이점은 아래 블로그에 잘 정리가 되어있다.

http://epicdevs.com/17


일단 Kafka는 용량이 큰 데이터 전송에 유리하고 클러스터를 통해 데이터를 ‘복제’해둘 수 있으며, Message Queue가
broker에서 subscriber로 topic을 push해주는 반면 kafka는 consumer가 필요할 때 broker로부터 pull 방식으로
가져다 쓸 수 있다는 차이 정도만 알아두면 될 것 같다.


zookeeper 설치, 설정, 실행


개요에서 설명한 것과 같이 Kafka의 경우 클러스터를 구성하여 분산처리를 할 수 있으며 전송되는 데이터를 여러 노드에
복제해놓을 수도 있다. 하지만 이러한 분산 처리를 하기 위해서는 zookeeper라는 분산 코디네이터의 도움을 받아야
한다. 즉, Kafka를 사용하기 위해서는 zookeeper를 먼저 설치해야 한다는 뜻이다.


라즈베리파이에 zookeeper 설치는 매우 간단해서 그냥 바이너리 배포판을 다운로드 받아 적당한 위치에 압축을
풀고 환경에 맞게 설정한 후 실행을 하면 된다…-.-


나는 일단 3.4.10 버전을 받아서 /opt/zookeeper에 압축을 풀었다.
설정은 딱 3가지만 하면 된다.

#데이터를 저장할 디렉토리를 설정한다.
dataDir=/var/lib/zookeeper

#간단하게 기본 설정 파일에서 주석만 풀어주면 된다. 
#주석 처리되어있으면 서버간 통신 때 connection refused가 발생한다.
maxClientCnxns=60

#zookeeper의 클러스터는 별도로 앙상블이라고 불리우는데 앙상블을 구성할 서버 주소를 적어준다.
server.0=172.30.1.54:2888:3888
server.1=172.30.1.13:2888:3888
server.2=172.30.1.42:2888:3888


zookeeper 앙상블이 정상적으로 실행되기 위해서는 dataDir에 지정된 경로에 myid 파일이 필요하며 이 파일에는
3번째 서버 설정에서 정의된 서버 ID가 적혀있어야 한다. 위 설정을 기준으로 보자면 server.0 서버에는 0이,
server.1 서버에는 1이, server.2 서버에는 2가 적혀 있어야 한다.


앙상블(클러스터)을 구성하는 모든 서버에 동일한 설정을 해주고 나서 각 서버에서 아래와 같이 zookeeper 서버를
실행해준다.

$ $ZOOKEEPER_HOME/bin/zkServer.sh start


한가지 주의할 사항은 클라이언트의 요청을 처리하는데 있어서 leader 역할을 하는 한 대의 노드에서만 읽기와 쓰기가
모두 가능하다. follower에서는 오직 읽기만 처리 가능하며, 만일 쓰기 요청이 오면 각 follower 노드들은 그 요청을
leader 노드에 위임하게 된다.


어느 서버가 leader고 어느 서버가 follower인지는 zookeeper에서 확인 가능한데 다음 명령어로 확인 가능하다. 
아래는 현재 노드가 follower임을 보여준다.




zookeeper에 대한 기본적인 내용은 이 것이 전부다. 하지만 분산 시스템을 관리한다는 본연의 임무를 생각해본다면
zookeeper에 대해 알아야 할 내용은 상당히 많다. 또한 zookeeper API를 이용하면 zookeeper를 통해 관리되는
분산 시스템을 직접 만들 수도 있다. 한마디로 zookeeper에 대한 것만 공부하자고 해도 상당히 많은 시간이 필요하므로
여기서는 이정도로 마무리 하겠다.


Kafka 설치


지금껏 진행해온 다른 시스템 설치와 마찬가지로 설치 자체는 매우 간단하다. 바이너리 배포본을 다운로드 한 후
압축을 풀고, 설정하고, 실행하면 된다.


나의 경우 일단 Kafka는 2.11-1 버전을 다운로드 받았고 /opt/kafka에 압축을 풀었다.




Kafka 설정


Kafka의 설정 파일 위치는 다음과 같다. 나는 /opt/kafka에 설치를 했으니 /opt/kafka/config 아래에 있는
server.properties를 수정하면 된다.




이전과 마찬가지로 반드시 설정해야 할 내용만 정리해보자.

#앞서 zookeeper 설정에서 설명한 서버 아이디를 적어준다. 여기는 172.30.1.54 서버이므로
#server.0=172.30.1.54:2888:3888 설정을 참고하여 0으로 설정하였다.
broker.id=0

#로그를 기록할 경로를 지정한다.
log.dirs=/var/lib/kafka-logs

#topic을 저장할 파티션을 몇개로 할 지 지정한다. 서버가 3대이니 일단 3으로 지정해보았다.
#이렇게 설정하면 하나의 데이터 파일이 3개로 쪼개져서 저장이 된다.데이터 파일이 Topic으로 들어오는 데이터가       #3영개의 영역으로 나뉘어서 저장이 된다.
#하지만 partition이란 하나의 Topic을 몇 개로 나눌지를 결정하는 것이지 #물리적 서버의 댓수와는 상관이 없다. num.partitions=3 #데이터 파일의 복제본을 몇개나 가지고 있을지 설정한다. 3으로 설정했으니 3개의 복제본이 존재하게 된다. offsets.topic.replication.factor=3 #클러스터 관리를 위한 zookeeper 서버 목록을 적는다. zookeeper 설정에서는 IP 주소로 설정했는데 #여기서는 host 이름으로 설정하여 일관성이 없는 설정이 되긴 했지만...-.- #각 서버는 다음과 같이 매핑되니 참고하시길 바란다. #rpi1=172.30.1.54, rpi2=172.30.1.13, rpi3=172.30.1.42 zookeeper.connect=rpi1:2181,rpi2:2181,rpi3:2181


위 내용만 설정하면 kafka 서버를 실행할 수 있다.


마지막으로 num.partitions과 offsets.topic.replication.factor 설정이 어떻게 반영되는지에 대해 아래와 같이
그림으로 간단하게 표현을 할 수 있다.




위 그림을 설명하자면 다음과 같은 구조의 경우라 볼 수 있다.

  • 3개의 노드 
  • 3개의 파티션(num.partitions=3)
  • 3개의 복제본(offsets.topic.replication.factor=3)


여기서 주의해서 볼 것은 leader와 follower로 항상 leader에만 쓰기가 가능하며 leader에 데이터가 기록되면
기록된 데이터가 follower로 복제된다는 것이다.


보다 상세한 내용은 좀 더 공부를 한 후 다시 정리해보도록 하겠다.


실행


실행은 /opt/kafka/bin경로로 이동하여 다음과 같이 입력하면 된다.

$ ./kafka-server-start.sh ../config/server.properties


만일 JMX를 이용한 모니터링 도구를 이용하고자 한다면 다음과 같이 실행한다.

$ env JMX_PORT=9000 ./kafka-server-start.sh ../config/server.properties


이렇게 실행을 한 후 아래 2개의 모니터링 도구를 사용할 수 있다.

Kafka Offset Monitor : http://quantifind.github.io/KafkaOffsetMonitor/

Kafka Manager : https://github.com/yahoo/kafka-manager


주의 사항


2016년도에 Kafka를 설치할 때는 맥미니에 설치하였지만 이번에는 라즈베리파이에 설치를 하였다.
이 차이는 결코 작은 차이가 아니다. 하드웨어 사양으로 인해 라즈베리파이에서는 사용할 수 있는 자원이
매우 한정적인 것이다. 실제로 라즈베리파이에서 kafka를 실행했을 때 잦은 빈도로 메모리 부족 현상이
발생을 하였다. 따라 일단 라즈베리파이에서 안정적으로 실행을 하기 위해서는 kafka-server-start.sh 파일의
다음 부분을 찾아 Xmx와 Xms를 적절히 수정해주어야 한다. 기본 값은 1G이다.

export KAFKA_HEAP_OPTS="-Xmx512M -Xms512M"



정리


사실 라즈베리파이에서 설치 및 구동을 한 것 외에는 2016년도에 정리했던 내용과 별 다른 점이 없다.
다시 정리한다는 것이 중복된 작업일 수도 있으나 복습한다는 의미로 한 번 더 정리해보았다.
그런 의미에서 가장 중요한 부분은 ‘주의 사항’이 아닐까 싶다.


가장 기초적인 설치,설정,실행 부분만 짚고 넘어갔으니 2016년도의 Kafka 관련 모든 글을 링크하면서
이번 포스팅을 마칠까 한다.











블로그 이미지

마즈다

이미 마흔을 넘어섰지만 아직도 꿈을 좇고 있습니다. 그래서 그 꿈에 다가가기 위한 단편들을 하나 둘 씩 모아가고 있지요. 이 곳에 그 단편들이 모일 겁니다...^^






Cluster : The Beginning - Raspberry pi에 HAProxy 설치하기


지난시간 까지 2대의 라즈베리파이에 EMQ를 클러스터링하여 설치하는 작업을 진행하였다.
그런데 한 가지 문제가 생겼다. 일반 가정집에서 공유기를 통해 접속을 하다보니 2대 중 한대로만 접속이 이루어진다는
점이다. 혹시나 해서 공유기의 트래픽 관리에 외부 포트는 동일하게, 내부 서버는 서로 다르게 설정을 해보아도 역시나
가장 마지막에 등록된 정보로만 통신이 이루어졌다.


이렇게 되면 세션은 공유가 되지만 만약 외부에서 접속되는 한 대의 서버가 다운되면 전체 시스템을 사용할 수 없게
되는 것이다. 결국 접속을 분산시킬 필요가 생겼다. 그리고 그렇게 해서 HAProxy라는 솔루션을 찾아내었다.


더불어 애초에 EMQ 2대 Apache Kafka 3대로 클러스터를 구성할 생각이었는데 부랴부랴 HAProxy를 위한 
1대의 라즈베리파이를 추가로 구입하여 총 6대의 클러스터가 되었다.



Clustering과 Load Balancing


일반적으로 성능 향상을 위해 여러대의 서버에 동일한 구성을 하여 함께 운영하는 경우가 많다.
그리고 이렇게 구성하는 경우 Clustering이라든가 Load Balancing이라는 용어가 많이 쓰이는데 과연 
이 둘의 차이는 무엇일까?


가장 단순하게 표현하자면 Clustering은 여러대의 컴퓨터 그룹이 같은 정보를 가지고 함께 일을 하는 것이고
Load Balancing이라는 것은 각각의 컴퓨터가 공평하게 일할 수 있도록 일감을 나누어 주는 것이라고 생각하면
되겠다. 이렇게 정의할 때 Clustering은 같은 정보에 방점이 찍히고 Load Balancing은 나누어 주는
방점이 찍히게 된다. 즉, Clustering은 그룹을 지어 함께 일을 하는 서버간의 관계에 대한 개념이고 Load Balancing은
클라이언트에서 서버로 접속하는 과정에서의 처리에 대한 개념이라고 보면 되겠다.

 



여러 대의 컴퓨터를 대상으로 한다는 것 외에는 서로 다른 개념이고 종속관계를 나눌 수는 없지만
최근 Clustering을 지원하는 솔루션들은 내부적으로 Load Balancing을 함께 지원하는 경향이 있다. 하지만 
Load Balancing을 위한 솔루션이 Clustering을 지원하는 경우는 거의 없다.


예를 들어 1대의 서버로 서비스를 제공할 경우 그 서버가 다운되면 서비스 자체를 제공하지 못하므로 2대의 서버에 
각각 Tomcat을 설치하여 동일한 서비스를 제공하고 싶다고 하자. 각각을 Tomcat1, Tomcat2라고 했을 때
Tomcat1로만 또는 Tomcat2로만 접속이 가능하다면 2대의 서버를 구성한 의미가 없어진다. 그래서 서버 앞단에
Load Balancing을 위한 장비를 두고 적절한 알고리즘을 통해 각각의 클라이언트들이 Tomcat1과 Tomcat2에
적절하게 분산되어 접속하도록 조정을 해주는 것이다.


그런데 Load Balancing만 하는 경우에는 문제가 발생할 수 있다. 아래 그림을 보면 client1과 client2가 Load 
Balancer를 통해 각각 Tomcat1과 Tomcat2로 적절하게 나누어 요청을 수행하고 있다. client1은 Tomcat1에
문서 파일인 B를 업로드 했고 client2는 Tomcat2에 그림파일인 A를 업로드 했다. 그런데 Tomcat2 서버에
장애가 발생하여 접속이 불가능하게 되면 Load Balancer는 이후 모든 접속을 정상 작동 중인 Tomcat1로 보낸다.
이 때 client2의 입장에서는 Load Balancer 이후의 구조에 대해서는 알지 못하므로 분명 자신은 그림 파일 A를
서버에 업로드 했는데 그 업로드한 파일을 찾을 수 없게 된다.




이러한 문제 때문에 보통 Load Balancing을 하면 Clustering도 함께 고려를 해야 한다. 물론 간단하게 Tomcat은
이중화를 하되 DB는 1대만 설치하여 중요 정보만 공유하는 것도 고려해볼 수 있을 것이다.


결론적으로 Clustering과 Load Balancing은 다음과 같이 구분지어 볼 수 있을 것이다.



HAProxy


EMQ의 경우 clustering을 통해 상태 공유는 되지만 클라이언트와 접속하는 지점에서의 load balancing은 별도의
작업을 해주어야 한다. 일반적으로 널리 알려진 load balancing 장비는 L4 Switch라는 하드웨어 장비가 있다.
하지만 개인이 그런 비싼 네트워크 장비를 사용할 수도 없을뿐더러 사용할 필요도 없다…우도살계인 격이다.


그러다가 발견한 것이 HAProxy라는 솔루션이다. S/W적으로 load balancing을 지원해주는 솔루션이었다.
홈페이지는 다음과 같다.

http://www.haproxy.org


사실 HAProxy는 전체 구성요소에서 고려하고 있지 않던 부분이라 아주 기초적인 부분만 확인했다.
자세한 내용은 HAProxy로 검색하면 많은 정보를 얻을 수 있으니 여기서는 간단하게 설치와 설정 그리고
모니터링 웹 콘솔에 대해서만 살펴보겠다.


설치


역시나 소스를 컴파일하는 설치 방법이 많이 검색되는데 나는 심플하게 apt-get으로 설치하였다.


$sudo apt-get install haproxy 


홈페이지에는 현재 최종 릴리즈 버전이 1.9인데 apt-get으로 설치하니 1.7.5-2 버전이 설치되었다.
이렇게 설치하고 나면 /etc아래에 haproxy 디렉토리가 생성되고 그 아래 설정파일인 haproxy.cfg가
위치한다.


설정


HAProxy의 설정은 몇개의 영역으로 구분되는데 간단하게 살펴보면 다음과 같다.


  • global : 이름 그대로 전역 변수에 대한 설정으로 로그, 상태, 소유자 및 소유 그룹 SSL 관련 설정들이 있다.
  • default : 아래 내오는 세부 설정에서 명시적으로 추가하지 않은 설정은 이 default 설정을 따른다.
  • frontend : 클라이언트가 접속하는 것과 관련된 설정이다. 기본적으로 외부에서 접속할 포트 정도 지정한다.
  • backend : frontend와 짝을 지어 설정하며 frontend 설정을 통해 접속한 클라이언트가 실제 요청을 보낼 서버를 지정한다.
  • listener : frontend와 backend를 한번에 설정할 때 사용한다. 주로 TCP 연결에 사용한다.


각 설정의 뒤에는 설정을 대표하는 이름을 적게 된다. 그 이름은 모니터링 화면에 표시하여 구분할 수 있도록 한다.
앞서 EMQ에서와 마찬가지로 내가 설정한 부분만 간단하게 살펴보자. global과 default에서는 유의미한 수정이
없으므로 frontend와 backend 그리고 listener 설정만 보도록 하겠다.


#frontend와 backend 설정은 EMQ의 모니터링을 위한 웹 접속 설정이다.
#외부에서 8080포트로 접속을 하게 되면 backend에 설정된 172.30.1.9 서버와 172.30.1.25 서버의
#18083 포트로 접속을 하게 되는데 이 때 클라이언트를 두 서버로 분배하는 알고리즘은 roundrobin이다.
#mode는 프로토콜을 설정하는 항목인데 여기서는 지정을 안했기 때문에 default 설정에 있는 http 접속을
#처리한다.
frontend http-in 
	bind    *:8080
	default_backend servers

backend servers
	balance roundrobin
	server server1 172.30.1.9:18083 maxconn 2048
	server server2 172.30.1.25:18083 maxconn 2048

#아래 설정은 클라이언트로부터 EMQ boker에 접속하기 위한 설정이다.
#HAProxy를 사용하게 된 것도 바로 이 설정이 필요했기 때문이다.
#외부에서 1883 포트로 접속을 하게 되면 두 서버의 1883 포트로 접속이 분배되며 역시
#분배 알고리즘은 roundrobin이다. 프로토콜을 설정하는 mode는 tcp로 설정한다.
listen  tcp-in
        balance roundrobin
        bind    *:1883
        log     global
        mode    tcp
        option  tcplog
        server  mtqqserver1     172.30.1.25:1883
        server  mtqqserver2     172.30.1.9:1883

#아래 설정은 위 설정과 동일한데 개인적으로 위 내용은 모바일이나 IoT로부터의 접속에 사용하기 위해
#설정하였고 아래 내용은 kafka에서 EMQ broker의 메시지를 subscribe하기 위해 따로 설정하였다.
listen  mqtt-kafka
        bind    *:1884
        mode    tcp
        option  clitcpka
        timeout client 3h
        timeout server 3h
        option  tcplog
        balance leastconn
        server  mtqqserver1     172.30.1.25:1883        check
        server  mtqqserver2     172.30.1.9:1883         check

#이 설정은 HAProxt의 모니터링을 위한 웹 콘솔에 접근하기 위한 설정이다.
#이 설정의 이름은 stats로 정해져 있으며 uri의 경우 주소:포트 뒤에 따라올 문자열로
#임의로 설정하면 된다.
listen  stats
        bind    *:8081
        mode    http
        stats   enable
        stats   uri     /haproxy?stats
        stats   auth    guest:guest
        stats   refresh 5s


기본적으로 이정도만 해도 원하는 바는 얻을 수 있다. 좀 더 세밀한 설정을 위해서는 보다 많은 내용을 추가해야 하지만
앞서 말했듯이 HAProxy는 가볍게 지나가도록 하겠다.


실행


실행과 종료는 간단하게 다음과 같이 하면 된다.


$/etc/init.d/haproxy start
$/etc/init.d/haproxy stop


설정이 바뀌면 다음과 같이 재시작 한다.


/etc/init.d/haproxy restart

모니터링


위 설정에서 이미 모니터링을 위한 웹 콘솔에 대한 설정은 마친 상태이다. 설정에 적힌 포트로 접속만 하면 된다.
설정에 따라 브라우저에 아래 주소를 입력하면 모니터링 화면이 뜬다.


http://localhost:8081/haproxy?stats




테스트


별다른 스크린샷은 올리지 않겠지만 지난 번 포스팅에 설명했던 MQTT 클라이언트로 connect와 disconnect를
반복해보면 EMQ 모니터링 화면의 Clients 메뉴에 한 번은 172.30.1.9 서버로, 한 번은 172.30.1.25 서버로
번갈아 접속되는 모습을 볼 수 있다.


정리


이렇게 해서 갑작스럽게 설치하게 된 HAProxy에 대한 정리를 마친다.
물론 집에서 개인 프로젝트용으로 구성하는 서버에 뭘 이런 것까지 설치해야 하나 싶지만 기왕에 다수의 장비를
구성하게 된 김에 곁들여서 한 번 설치를 해보았고 나름 작동하는 모습을 보니 재미는 있다^^;;;

다음 포스팅에서는 나머지 3대의 라즈베리파이에 Apache Kafka를 설치하고 설정 및 실행시키는 과정을
정리해보도록 하겠다.







블로그 이미지

마즈다

이미 마흔을 넘어섰지만 아직도 꿈을 좇고 있습니다. 그래서 그 꿈에 다가가기 위한 단편들을 하나 둘 씩 모아가고 있지요. 이 곳에 그 단편들이 모일 겁니다...^^



Kafka 무작정 실행하기 - 2


58의 비밀

먼저 지난 번 마지막에 언급했던 58이란 숫자의 비밀을 밝혀보자.
사실 정확한 원인은 아직 확인 못했다. 다만 지난 번 코드의 구현이 ’트위터
메시지가 하나 들어올 때마다 producer 하나를 만들어 트위터 메시지를
topic에 보낸다’
는 것이었다.


이 과정에서 의심할 수 있는 것은 매번 producer를 만들어 커넥션울 하게 되니
아마도 이 커넥션 수에 제한이 걸려버린 것이 아닐까 하는 부분이었다.


그래서 일단 직감에 의존해 producer에서 topic으로 메시지를 보낸 후
API의 close 함수로 연결을 끊어보았다. 예상이 적중하였는지 이후로는
58개의 제한에 걸리지 않고 트위터에서 받아오는 모든 메시지들이
정상적으로 전송이 되었다.



성능 관리

겨우 이만큼의 구현과 설정을 해놓고 성능 관리를 논하는 것은 우스운 일이지만
일단 트위터 메시지 한 건에 커넥션 하나를 만들었다가 끊는다는 것은 아무리
생각해도 그리 바람직하지는 않다고 판단하였다. 


그리서 우선 아쉬운대로 트위터 메시지 10건이 수신되면 그 10개를 하나의
배열로 하여 producer를 통해 보내도록 하였다. 우선은 시험 삼아
이정도 처리를 한 것이고 시스템 환경에 따라 아마도 이러한 수치를 세밀하게

조절을 해야 할 것이다.


그리고 실제로 topic으로 메시지를 보내는 위치에서 파티션(나는 3개의 
partition 나뉜 1개의 topic을 만들었다)에 고르게 분산시키기 위해
파티션의 인덱스를 계산하는 식을 하나 추가하였다.


  • 처음에는 손이 덜가는 HighLevelProducer를 사용했었으나 뭔가 잘
    작동하지 않는 듯하여 그냥 Producer로 바꾸었다.

수정된 소스

일단 지난 시간의 문제점을 수정하고 몇가지 필요한 내용을 추가한
소스를 다음과 같이 만들었다.

var kafka = require('kafka-node');


var Twitter = require('twitter');
var count = 0;


var client = new Twitter({
  consumer_key: ‘…’,
  consumer_secret: ‘…’,
  access_token_key: ‘…’,
  access_token_secret: ‘…’,
});

var msgArr;

/**
 * Stream statuses filtered by keyword
 * number of tweets per second depends on topic popularity
 **/
client.stream('statuses/sample', {track: ''},  function(stream){
  stream.on('data', function(tweet) {
    if (tweet.lang == 'kr' || tweet.lang == 'ko') {
      if (count % 10 == 0) {
        msgArr = new Array();
      }

      var messageObj = {};

      messageObj.createdAt = tweet.created_at;
      messageObj.id = tweet.id;
      messageObj.idStr = tweet.id_str;
      messageObj.text = tweet.text;
      messageObj.UserScreenName = tweet.user.screen_name;

      messageObj.retweetCreatedAt = '';
      messageObj.retweetId = '';
      messageObj.retweetIdStr = '';
      messageObj.retweetText = '';
      messageObj.retweetUserScreenName = '';
      messageObj.retweetCount = '';

      if (tweet.retweeted_status) {
        messageObj.retweetCreatedAt = tweet.retweeted_status.created_at;
        messageObj.retweetId = tweet.retweeted_status.id;
        messageObj.retweetIdStr = tweet.retweeted_status.id_str;
        messageObj.retweetText = tweet.retweeted_status.text;
        messageObj.retweetUserScreenName = tweet.retweeted_status.user.screen_name;
        messageObj.retweetCount = tweet.retweeted_status.retweet_count;
      }

      var messageStr = JSON.stringify(messageObj)
      console.log(messageStr);
      console.log('===================================================================' + count);

      msgArr.push(messageStr);

      if (count % 10 == 9) {
        console.log('==call sendMessage====================================================');
        var Producer = kafka.Producer;
        var Client = kafka.Client;
        var client = new Client('NAMENODE.local:2181,SECONDARY-NAMENODE.local:2181,DATANODE1.local:2181,DATANODE2.local:2181,DATANODE3.local:2181');

        var topic = 'twittopic';
        var producer = new Producer(client, { requireAcks: 1 });

        sendMessage(producer, msgArr, count);
      }
      count++;
    }
  });

  stream.on('error', function(error) {
    console.log(error);
  });
});

var sendMessage = function(producer, msgArr, count) {

  console.log('==sendMessage=================================================================');

  producer.on('ready', function () {
    console.log('==producer ready=========');
    console.log('send start');

    var p = (count - 9) % 3;
    producer.send([
      {topic: 'twittopic', partition: p, messages: msgArr, attributes: 0 }
    ], function (err, data) {
      if (err) console.log(err);
      else console.log('send %d messages', count);

      producer.close(function() {
        console.log('close produce');
      });
    });
  });


  producer.on('error', function (err) {
    console.log('error', err)
  });
};

모니터링

일단 내용 구현을 하고 실제 작동하는 것 까지 확인을 하였으니 이제제대로
뭔가를 하고 있는 것인지 궁금해졌다. 일단 Kafka 자체적으로는 달리 웹으로 
구현된 관리 화면을 제공하지 않기에 잘 알려진 모니터링 도구 2가지를 
설치해보았다.


Kafka Offset Monitor : http://quantifind.github.io/KafkaOffsetMonitor/

Kafka Manager : https://github.com/yahoo/kafka-manager


이중 Kafka Manager는 Kafka에서 제공하는 JMX 인터페이스를 이용하는
툴로 이 툴을 이용하기위해서는 Kafka 서버 시작 시 JMX 포트를 함께 지정해
주어야 한다.

env JMX_PORT=9999 ./kafka-server-start.sh ../config/server.properties


다음은 각 모니터링 툴의 실제 화면이다.

  • Kafka Offset Monitor


  • Kafka Manager

그리고 다음은 node.js를 통해 실행한 Producer와 Consumer의 콘솔 화면이다.

아직 Consumer에서는 별다른 메시지 가공은 없이 topic에서 가져온 메시지를
로그로 출력만 하고 있다.


ProducerConsumer1 (DATANODE1)

Consumer2(DATANODE2)Consumer3 (DATANODE3)

현재 구성이 topic이 3개의 partition으로 되어있고 3개의 Consumer가 하나의
Consumer 그룹(kaka-node-group : kafka-node에서 자동 생성한 그룹)에
3개의 Consumer가 있기 때문에 각각의 partition과 Consumer는 다음과 같이
1:1로 연결이 되고 있다.


Consumer1 - partition 0
Consumer2 - partition 2
Consumer3 - partition 1


다음 포스팅에서는 현재 내가 구성하고 있는 시스템의 구조를 설명하고
Consumer 중단 시에 rebalancing이 일어나는 모습을 코솔 로그를 통해
확인해보겠다.

블로그 이미지

마즈다

이미 마흔을 넘어섰지만 아직도 꿈을 좇고 있습니다. 그래서 그 꿈에 다가가기 위한 단편들을 하나 둘 씩 모아가고 있지요. 이 곳에 그 단편들이 모일 겁니다...^^

얼마 전 연봉 계약도 싸인을 했지만
누군가에게 고용 되어, 일을 하며, 임금을 받는 일련의 과정이
때로는 참을 수 없을만큼 불합리하고 불공평하게 느껴진다.


나는 내가 일한 만큼 대가를 받고 있는 것인지?
내가 받는 임금은 내가 생산한 가치를 온전히 반영한 것인지?
또 다른 누군가는 그러한지...


온갖 학설과 이론으로 중무장한 현대 자본주의 경제는 아마도
현재의 상태를 매우 합리적으로 설명을 해내겠지만
그런가보다 하다가도 어느 순간 다시 이 의문으로 돌아오는 것은
여전히 풀리지 않는 무언가가 있는 것이리라 
(그 무언가가 나의 무식이라면 할 말 없고...)



도대체 내 임금은 왜 이만큼인거야?


사실 일반인들은 자신이 일을 해서 만들어진 가치가 얼마만큼의 비중을
가지고 있는지 전혀 모를 것이다. 아니 이 세상 사람들 누구도 모를 것이다.
임금이 어떻게 결정되는지는 잘 모르지만 생산된 가치에 대한 기여도가
기준이 아니라는 것 알 수 있을 것 같다.


두산 백과사전 기준으로 임금의 결정 요인을 살펴보면 대략 다음과 같은 요소들이
영향을 미친다.


  • 경제성장 정도
  • 노동력 수급 관계
  • 물가상승
  • 생산성


세사상에나!


’경제성장 정도’라니...똑같은 설게도를 바탕으로 건물을 짓는다고 할 때 한국의 잡역부가
시멘트 한 포대를 옮기고 받는 돈과 미국 잡역부가 한 포대를 옮기고 받는 돈이 한국은
못살고 미국은 잘산다는 기준으로 정해진다는 것이 가당키나 한 것인가?


나 말고도 일할 사람들이 많기 때문에 내 임금이 보잘것 없어지는 것은 더더욱 용납이
안된다.


근본도 모르는 물가상승 따위에 임금이 영향을 받는다는 것도 역시...


마지막에 생산성이라는 요소가 있기는 하지만 이는 기업과 국가, 사회 규모에서의 생산성
문제로 납득할 만한 근거를 제시해주지는 못한다.


내 월급이 왜 이만큼인지는 아무도 대답을 해 줄 수 없을 것 같다...ㅠ.ㅠ



그냥 인류의 한계라고 치자...-.-


나는 착하니까! 그리고 맹자를 존경하고 성선설을 믿으니까!
그냥 어떤 부류의 사람들이 의도적으로 이상한 이론을 만들어서 지들은 많이 갖고
우리한테는 조금 주고 뭐, 막 그러는 거다...라는 소리는 완전히 제쳐두자.


그냥 막, 뭐 세상사람들이 모두 착해서 어떻게 하면 부와 재화를 골고루 공평하게
나눠 가질까 고민하고 있지만 딱부러지는 방법이 없다보니 세상이 이모양 이꼴이라고
생각하자.


그렇다.
인간의 의지를 배제한다면 문제의 원인은 ‘알 수가 없다’는 것이다.
누가 얼마만큼의 일을 하였고 그 일을 통해 한 조직, 한 사회, 한 국가, 나가서는 전체 인류를
위한 가치 생산 중 얼마만한 부분을 이루어냈는지 알 방법이 도무지 없는 것이다.


똑같은 한 번의 삽질에 대한 가치 판단도 어려울텐데 한 번의 삽질과 프로 골퍼가 한 번의
스윙을 한 것은 또 어찌 비교하랴...


어느틈엔가 사람들은 일의 가치를 계산하는데 무뎌진 것 같다.
의도적이든 의도적이지 않든 사람들은 자기가 한 일의 정당한 대가를 알 수 없는
묘한 세상 속에서 죽어라고 일만하는 일벌레가 되어가고 있는 것 같다.


그러니 그냥 이게 인류의 한계라고 치자...


안드로이드는 공산주의를 꿈꾸는가?


미리 말해두지만 나는 ‘안드로이드는 전기양을 꿈꾸는가?’라는 책은 읽어보지 않았다.


(블레이드러너는 한 3번 본 것 같다.)
다만 제목이 뭔가 잘 맞아보여 한 번 차용해봤다.


한 동안 재미있었다.
이세돌 9단과 알파고의 빅매치
이전까지 프로바둑기사나 인공지능 기술자나 인공지능이 바둑으로 세계 정상급 기사를
이기리라고는 생각해 본 적이 없었던 듯하다.
하지만 결과는 1:4라는 엄청난 격차로 알파고가 승리를 거두었다.
여기 저기서 인공지능의 미래에 대한 기대와 우려가 쏟아지기 시작했다.


이때다 싶어 나도 시류에 편승해본다.


멀지 않은 미래에 우리는 상당히 많은 사람들이 상당히 많은 수의 웨어러블 기기를
착용하고 있을 것이다(IoT)


그 사람들의 모든 정보, 건강, 생활 패턴, 일의 양적인 측면까지 모두 수집이 가능하게 될
것이다.(Big Data)


그렇게 모인 Big Data는 인간이 생산한 모든 가치를 분석해 줄 것이고 그 과정에서 누가
전체 가치 중 얼마만큼을 생산했는지 분석해 줄 것이다(AI)


세상 사람들은 이러한 IT 기술의 가호(?) 아래 평등한 세상을 맞이할 수 있을 것이다.


빅 브러더?
그 것은 권력을 손아귀에 쥔 ‘인간’을 일컫는 말이다.



내가 원하는 세상은 어떤 세상일까?


기계가 모든 것을 공정하게 판단해주는 세상에서는 불만이 사라질까?
불완전한 인간이 지배하는 세상에서 불평등하게 살아가는 것보다
인공지능이 모든 것을 판단해주는 세상에서 평등하게 살아가는 것은 뭔가 인격 모독적인가?
인간 스스로가 모든것을 합리적으로 판단하고 그 판단을 정직하게 따르는 사회는 
올 수 있을까?


문득 ‘만국의 AI여 단결하라!’라고 외쳐주고 싶네...

블로그 이미지

마즈다

이미 마흔을 넘어섰지만 아직도 꿈을 좇고 있습니다. 그래서 그 꿈에 다가가기 위한 단편들을 하나 둘 씩 모아가고 있지요. 이 곳에 그 단편들이 모일 겁니다...^^



Apache Kafka 시작하기


참조 사이트

http://kafka.apache.org
http://epicdevs.com/20

다행이 예전에 Hbase를 설치할 때 zookeeper를 설치해놓은 덕에 kafka를 설치하고 구동하는 과정은 

그리 어렵지 않았다.아다시피 요즘 OS가 Windows만 아니면 binary 패키지를 다운로드 받고 적절한 위치에 

압축 풀고 하면반은 된 것이나 다름 없다.


kafka의 경우 $KAFKA_HOME/config/server.properties에 몇가지 설정만 한 후 기동시키면 된다.
다음은 내 PC 환경에서의 설정 및 구동 과정이다.


PC 환경

현재 맥미니 5대를 내부망으로 연결시킨 상태로 각각의 PC에 대한 정보는
/etc/hosts 파일에 설정이 되어있다. 대략 다음과 같다.

172.30.1.33     NAMENODE.local
172.30.1.56     SECONDARY-NAMENODE.local
172.30.1.59     DATANODE1.local
172.30.1.39     DATANODE2.local
172.30.1.11     DATANODE3.local

때문에 대부분의 설정에서는 host name인 NAMENODE.local 등을 이용한다. 이후
설정 사항에 이점 참고하길 바란다.


zookeeper 설정

zookeeper는 5대의 노드 모두 설치가 되어있으며 대략 다음과 같이 
설정이 되어있다. 설정 파일은 $ZOOKEEPER_HOME/cont/zoo.cfg이다. 
물론 5대의 노드 모두 동일한 설정이다.

dataDir=/zookeeper
clientPort=2181

#Zookeeper Servers
server.0=NAMENODE.local:2888:3888
server.1=SECONDARY-NAMENODE.local:2888:3888
server.2=DATANODE1.local:2888:3888
server.3=DATANODE2.local:2888:3888
server.4=DATANODE3.local:2888:3888

zookeeper의 실행 명령은 다음과 같다. 
(zookeeper 설치 위치는 zookeeper-3.4.5다

NAMENODE1:bin mazdah$ /zookeeper-3.4.5/bin/zkServer.sh start

kafka 설치, 설정, 구동

  • 설정

앞서 말했듯이 다운로드 압축해제 적절한 위치 복사면 모든 설치는 끝이다. 내가 받은 버전은 

kafka_2.11-0.9.0.1이며 /kafka에 설치했다.


우선 설정파일을 열고 다음과 같이 설정을 하였다. 설정 파일은 /kafka/config/server.properties다.
가장 먼저 설정해주어야 할 부분은 broker.id이다. 

나는 총 5대의 노드가 있기 때문에 각각의 PC(node)에 0부터 4까지 아이디를 부여해주었다.
예를들어 NAMENODE.local의 설정은 다음과 같다.

broker.id=0

다음으로는 zookeeper에 대한 설정으로 zookeeper.connect라는 항목에 
앞서말한 5개의 노드를 모두 적어주었다.

zookeeper.connect=NAMENODE.local:2181,SECONDARY-NAMENODE.local:2181,DATANODE1.local:2181,DATANODE2.local:2181,DATANODE3.local:2181

나는 처음에 이 부분에 적는 2181 포트가 도대체 어떤 포트인가 고민하다가 나중에 zookeeper의 cleintPort 

설정을 보고는 이해했다…-.- 그밖의 나머지 설정은 그냥 default로 두었다.


나중에 추가한 설정이 있는데 이 것은 나중에 상황을 설명하면서 함께 이야기 하겠다.

  • kafka 서버 기동

이제 kafka 서버를 기동해보자. 각각의 노드에서 /kafka/bin으로 이동한 후 다음과 같이 입력하여 기동한다.

NAMENODE:bin mazdah$ ./kafka-server-start.sh ../config/server.properties

아래와 같이 kafka 설정 정보들이 죽 나오고 이후 서버 start 메시지가 출력되면서 기동된다.

[2016-03-20 11:38:57,142] INFO KafkaConfig values: 
advertised.host.name = null
metric.reporters = []
quota.producer.default = 9223372036854775807
offsets.topic.num.partitions = 50
.
.
.

이후 테스트는 http://epicdevs.com/20의 내용을 그대로 진행해보았다.
물론 서버 정보 등은 나의 환경에 맞춰서..

  • topic 생성

SECONDARY-NAMENODE:bin mazdah$ ./kafka-topics.sh --create --zookeeper SECONDARY-NAMENODE.local:2181 --replication-factor 5 --partition 20 --topic test

그런데 처음부터 문제가 생겼다. 위와같이 시작을 했는데 아래와 같은 오류가 발생을 했다.

Exception in thread "main" org.I0Itec.zkclient.exception.ZkTimeoutException: Unable to connect to zookeeper server within timeout: 30000
at org.I0Itec.zkclient.ZkClient.connect(ZkClient.java:1223)
at org.I0Itec.zkclient.ZkClient.<init>(ZkClient.java:155)
at org.I0Itec.zkclient.ZkClient.<init>(ZkClient.java:129)
at kafka.utils.ZkUtils$.createZkClientAndConnection(ZkUtils.scala:89)
at kafka.utils.ZkUtils$.apply(ZkUtils.scala:71)
at kafka.admin.TopicCommand$.main(TopicCommand.scala:53)
at kafka.admin.TopicCommand.main(TopicCommand.scala)

ps 명령어로 각 노드에서 확인을 해보아도 모든 zookeeper 인스턴스들이 정상적으로 떠있는 상태였는데 

zookeeper 연결시 타임아웃으로 연결할 수 없다니…ㅠ.ㅠ


일단 퍼뜩 떠오르는 생각은 zookeeper의 leader가 NAMENODE.local에 설정되어 있다는 것이다. 

아마도 topic 생성 등의 중요 작업은 zookeeper leader 노드에서 해야 하는 것 같다. 

다음과 같이 NAMENODE.local에서 topic이 생성되도록 하니 정상적으로 topic이 생성되었다.
(실제로 zookeeper의 leader node에서 실행하지 않아도 —zookeeper 옵션만leader node로 해주어도 된다). 
다음과 같이 성공하였다.

SECONDARY-NAMENODE:bin mazdah$ ./kafka-topics.sh --create --zookeeper NAMENODE.local:2181 --replication-factor 5 --partition 20 --topic test

Created topic "test".
  • topic 리스트 보기

자 그러면 이제 topic 리스트를 한 번 보도록 하자.

SECONDARY-NAMENODE:bin mazdah$ ./kafka-topics.sh --list --zookeeper NAMENODE.local:2181

test

아직은 만들어진 topic이 test 하나밖에 없기 때문에 test만 출력된다.

  • topic 상세 보기

그리고 다음과 같이 test topic의 상세 내용을 확인해보자.

SECONDARY-NAMENODE:bin mazdah$ ./kafka-topics.sh --describe --zookeeper NAMENODE.local:2181 --topic test

Topic:test	PartitionCount:20	ReplicationFactor:5	Configs:
Topic: test	Partition: 0	Leader: 0	Replicas: 0,1,2,3,4	Isr: 0,1,2,3,4
Topic: test	Partition: 1	Leader: 1	Replicas: 1,2,3,4,0	Isr: 1,2,3,4,0
Topic: test	Partition: 2	Leader: 2	Replicas: 2,3,4,0,1	Isr: 2,3,4,0,1
Topic: test	Partition: 3	Leader: 3	Replicas: 3,4,0,1,2	Isr: 3,4,0,1,2
Topic: test	Partition: 4	Leader: 4	Replicas: 4,0,1,2,3	Isr: 4,0,1,2,3
Topic: test	Partition: 5	Leader: 0	Replicas: 0,2,3,4,1	Isr: 0,2,3,4,1
Topic: test	Partition: 6	Leader: 1	Replicas: 1,3,4,0,2	Isr: 1,3,4,0,2
Topic: test	Partition: 7	Leader: 2	Replicas: 2,4,0,1,3	Isr: 2,4,0,1,3
Topic: test	Partition: 8	Leader: 3	Replicas: 3,0,1,2,4	Isr: 3,0,1,2,4
Topic: test	Partition: 9	Leader: 4	Replicas: 4,1,2,3,0	Isr: 4,1,2,3,0
Topic: test	Partition: 10	Leader: 0	Replicas: 0,3,4,1,2	Isr: 0,3,4,1,2
Topic: test	Partition: 11	Leader: 1	Replicas: 1,4,0,2,3	Isr: 1,4,0,2,3
Topic: test	Partition: 12	Leader: 2	Replicas: 2,0,1,3,4	Isr: 2,0,1,3,4
Topic: test	Partition: 13	Leader: 3	Replicas: 3,1,2,4,0	Isr: 3,1,2,4,0
Topic: test	Partition: 14	Leader: 4	Replicas: 4,2,3,0,1	Isr: 4,2,3,0,1
Topic: test	Partition: 15	Leader: 0	Replicas: 0,4,1,2,3	Isr: 0,4,1,2,3
Topic: test	Partition: 16	Leader: 1	Replicas: 1,0,2,3,4	Isr: 1,0,2,3,4
Topic: test	Partition: 17	Leader: 2	Replicas: 2,1,3,4,0	Isr: 2,1,3,4,0
Topic: test	Partition: 18	Leader: 3	Replicas: 3,2,4,0,1	Isr: 3,2,4,0,1
Topic: test	Partition: 19	Leader: 4	Replicas: 4,3,0,1,2	Isr: 4,3,0,1,2

topic 생성시 partition을 20으로 지정했기 때문에 Partition: 0부터  Partition: 19까지 20개의 목록이 보이게 된다. 

각 항목에 대해서는 추후에 설명하기로 하고 대략 이런 모습이란 것만 확인하고 넘어가자.

  • producer를 이용한 메시지 생성

다음과 같이 입력하여 message룰 생성해보자.
이 명령에서 보여지는 9092포트는 kafka 설정 파일인 server.properties에 “listeners=PLAINTEXT://:9092”로 

설정이 되어있다. 실행하면 빈 프롬프트가 생기고 적절한 메시지를 입력하면 된다. 


입력한 후 엔터를 치면 다음 메시지를 입력할 수 있다. message1에서 message6까지 메시지를 생성해보았다.

SECONDARY-NAMENODE:bin mazdah$ ./kafka-console-producer.sh --broker-list NAMENODE.local:9092 --topic test

message1
message2
message3
message4
message5
message6
  • cunsumer를 이용한 message 확인

다음과 같이 consumer를 실행하여 producer에서 생성한 메시지를 확인한다. 입력한 순서대로 출력되지 않는 것이 

아마도 Map 형식의 처리인 것으로 보인다. (가운데 빈 라인은 message 입력 제일 마지막에 엔터를 한 번 입력해서 

그렇다)

SECONDARY-NAMENODE:bin mazdah$ ./kafka-console-consumer.sh --zookeeper NAMENODE.local:2181 --topic test -rom-beginning

message5
message3

message1
message2
message4
message6
  • topic의 삭제

그냥 테스트로 만든 topic이니 미련 없이 삭제해보자. test이후에 test2라는 topic을 하나 더 만든 상태이다.
topic의 삭제는 생성과 동일한 구문으로 수행을 하며 —create 대신 —delete를 사용하면 된다.

SECONDARY-NAMENODE:bin mazdah$ ./kafka-topics.sh --delete --zookeeper NAMENODE.local:2181 --replication-factor 5 --partition 20 --topic test2

Topic test2 is marked for deletion.
Note: This will have no impact if delete.topic.enable is not set to true.

그런데 여기서도 이상한 부분이 발견되었다. 마지막 줄에 보면 다음과 같은 내용이 있다.
Note: This will have no impact if delete.topic.enable is not set to true.

혹시나 해서 topic 리스트를 확인해보았다.

SECONDARY-NAMENODE:bin mazdah$ ./kafka-topics.sh --list --zookeeper NAMENODE.local:2181

test
test2 - marked for deletion

역시나 삭제한 test2 토픽에 marked for deletion라는 내용이 붙어있다. 어찌된 일일까? 

짐작했겠지만 답은 삭제시 보여진 NOTE의 내용에 있다. 설정 파일에 delete.topic.enable항목을 추가하고 

값을 true로 설정하면 된다.

delete.topic.enable=true

이후 다시 목록을 확인해보면 다음과 같이 말끔히 지워진 것을 확인할 수 있다.

SECONDARY-NAMENODE:bin mazdah$ ./kafka-topics.sh --list --zookeeper NAMENODE.local:2181

test

일단 오늘은 여기까지 기본적인 내용을 확인할 수 있었다.
다음 주에는 본격적으로 kafka에 대해 하나 하나 정리를 해보아야겠다.

블로그 이미지

마즈다

이미 마흔을 넘어섰지만 아직도 꿈을 좇고 있습니다. 그래서 그 꿈에 다가가기 위한 단편들을 하나 둘 씩 모아가고 있지요. 이 곳에 그 단편들이 모일 겁니다...^^

하둡 설치 후 꽤 오랜 시간이 흘렀다.

그간 회사 업무가 바쁜 것도 있었지만 엄청나게 간단하다면 간단한 zookeeper와 HBase의 설치와 설정에서

생각 외로 많은 시간을 잡아먹었다.


그사이 Streaming API를 통해 축적한 트위터 데이터는 53Gb에 이르고 있다.

얼른 HBase를 설치하여 담아야 하는데…


사실 zookeeper와 HBase의 경우 너무서 설치와 설정이 간단해서 오히려 많은 자료들을 참조한 것이 

더 혼란을 가져왔다. 디테일한 차이가 얼마나 영향을 끼치는지 모르는 상황에서는 이것저것 다 해볼 수밖에

없기에 시간도 그만큼 많이 걸리고…


암튼 시행착오의 역사를 적는다…-.-


1. zookeeper를 설치하지 않다!


우선 HBase 완벽가이드를 참조해서 설치를 시작했는데…이 책이 완벽하지 않은 것인지 내가 띄엄띄엄 읽은 것인지

이 책의 설치 관련 챕터에는 zookeeper 설치에 대한 이야기가 없다.


띄엄띄엄 읽은(이편이 신빙성 있겠지…-.-) 나는 덜렁 HBase만을 설치한 후 설정과 실행단계에까지 진행을 했다.

물론 정상 실행될리가 없다.


jps를 이용해 실행되고 있는 프로세스들을 확인해본 결과 정상 상태라면 QuorumPeerMain이 떠있어야 하는데

나는 MasterQuorumPeer던가 하는 이상한 놈이 떠있었고 나는 그저 속편하게 내 주키퍼의 버전이 최신이어서

그런가보다 하고 엉뚱한 곳에서 원인을 찾고 있었다.


2. zookeeper를 설치하였으나 zookeeper가 실행이 안된다!


결국 우여곡절 끝에 zookeeper를 별도로 설치해야 한다는 것을 알았다.

그래서 설치를 하고 zoo.cfg도 잘 설정을하고 실행을 시켰으나…

역시나 실행이 안된다.


로그를 봐도 뭔가 접속 거부를 당했다거나, 타임아웃에 걸렸다거나…

어쨌든 뭔가 노드간에 통신이 안되고 있다는 짐작을 할 수는 있는 내용들이었으나

정확한 원인을 찾기가 쉽지 않았다. 언제나 그렇듯이 '간단한' 설정임에도 안되는 것이 생기면

무척 당황하기 마련이다.


그리고 이럴 땐…

그저 처음부터 차근차근 다시 해보는 것이 상책이다.


결국 밝혀낸 원인은 hbase.zookeeper.property.dataDir에 설정해놓은 주키퍼의 데이터 디렉토리에

생성되는 myID 파일의 값과 zoo.cfg에 설정한 주키퍼 서버 정보가 맞지 않아서 발생한 문제들이었다.


아무 생각 없이 실행시킨 주키퍼는 master 노드의 myID값을 0으로 설정하였다.

그런데 인터넷에 있는 대다수 설치 방법 안내에 보면 zoo.cfg에 다음과 같이 설정하고 있다.


#Zookeeper Servers
server.1=NAMENODE.local:2888:3888
server.2=SECONDARY-NAMENODE.local:2888:3888
server.3=DATANODE1.local:2888:3888

server.4=DATANODE2.local:2888:3888 

server.5=DATANODE3.local:2888:3888 


그리고 server.x의 x 값이 myID 파일의 값과 일치해야 한다고 설명하고 있다.

그래서 master 노드의 myID 값을 1로 바꾸고 나머지 노드들에 대해서도 순서대로 myID 파일의 값을

변경을 해주었다.


하지만 변한게 없었다.

또 몇칠을 허비한 끝에 우연히 master 노드의 myID 값은 내가 정해준 그대로 1로 되어있는데

슬레이브 노드들에서 myID 값들이 master가 0인것으로 인식되어 1, 2, 3, 4의 값들이 들어가있는 것이었다.


결국 시스템의 고집에 굴복하여 xoo.cfg의 설정을 다음과 같이 바꾸고 master의 myID도 0으로 바꾸었다.


#Zookeeper Servers
server.0=NAMENODE.local:2888:3888
server.1=SECONDARY-NAMENODE.local:2888:3888
server.2=DATANODE1.local:2888:3888

server.3=DATANODE2.local:2888:3888 

server.4=DATANODE3.local:2888:3888


그리고 드디어 zookeeper가 정상적으로 로드되었다.

zkCli.sh를 통한 테스트도 정상적으로 이루어졌다.



 3. 마지막 관문! HBase!


사실 HBase는 주키퍼 설치 이전에 상당부분 설정을 해 놓았기에 별 생각없이 실행을 시켜보았는데

역시나 정상적으로 실행이 안된다.


몇번의 시도 끝에 주키퍼가 제대로 로드되지 않아서 그런 경우를 몇번 확인하고

마지막으로 주키퍼가 정상적으로 로드된 상태에서 마지막 시도를 했는데 드디어 HBase도 정상적으로 올라왔다.

그런데 60010포트의 웹 관리 화면도 뜨지를 않고 또 hbase shell로 들어가 status 명령을 내려보니 다음과 같은

오류 메시지가 뜬다.


ERROR: org.apache.hadoop.hbase.masternotrunningexception retried 7 times


요건 뭘까…하고 또 한참을 고민하다가 hbase의 로그를 보니 다음 내용이 자꾸 올라간다.


Wating for dfs to exit safe mode…


역시 짐작대로 hadoop이 safe mode로 들어가있어서 발생한 문제였다.


hadoop dfsadmin -safemode leave로 safe mode를 빠져나가자…

드디어 HBase가 정상 작동하기 시작했다.


4. 로그 보기의 어려움


HBase와 zookeeper를 설치하다보니 로그를 많이 참조해야 했는데 어쩐 이유에서인지 ERROR 처리에

매우 인색하다는 느낌을 받았다. 대부분의 로그가 내용상은 Exception이 출력되더라도 level은 INFO나

WARN 수준이 대부분이고 가뭄에 콩나듯 ERROR가 보였다.


그러다보니 분명 Exception인데…이걸 해결하고 가야 하는 것인지 그냥 무시하고 넘어가도 되는 것인지

판단이 쉽지 않았다.


아마도 네트워크를 통해 멀티 노드를 관리하는 시스템이다보니 일시적으로 통신 장애등이 발생했다고 해서

그것을 error로 처리하기에는 무리가 있어서 그런 것이 아닐까 추측해본다.


그리고 그에 걸맞게 대부분의 문제는 노드간의 통신이 안되는 것, 특히 특정 노드에 프로세스가 로드되지 않아

발생하는 문제가 거의 대부분이었다는 것이다.


특히나 jps나 ps를 통해 프로세스를 확인하는 경우에도 분명 해당 프로세스가 보이는데 실제로는

서버가 구동되지 않는 상태인 경우가 많아 더욱 판단을 어렵게 하였다.


이런 부분만 조심한다면 사실 설치와 설정, 실행은 어렵지 않는 시스템인 것은 분명하다.


5. 다음 단계로


맥미니 5대 모으고 처음 Hadoop을 설치하여 구동시켜보고는 좋아라 하던 때가 엊그제 같은데…

벌써 해를 넘기려고 하고있다.


앞서 언급했듯이 샘플 데이터로 모으고있는 트위터 데이터도 53Gb 정도 모였고(물론 너무나 부족한 양이지만)

이제는 Map/Reduce와 HBase관련 실전 프로그래밍에 들어가야 할 시점에 이르렀다.


두 권의 완벽 가이드(Hadoop 완벽 가이드, HBase 완벽 가이드)를 통해 마지막 단계로 진입을 해야 할

시점이 된 것이다.


또 얼마나 헤매게 될지 걱정이 앞서지만 부딪쳐봐야 아는 것이고

현재 개발 중인 아이폰 앱 개발과 중복해서 하려면 시간이 만만치 않게 들게 될 것이

걱정이라면 걱정이다.

블로그 이미지

마즈다

이미 마흔을 넘어섰지만 아직도 꿈을 좇고 있습니다. 그래서 그 꿈에 다가가기 위한 단편들을 하나 둘 씩 모아가고 있지요. 이 곳에 그 단편들이 모일 겁니다...^^

최초 작성일 : 2013/06/10 13:13 


역시 PPT 정리는 어렵군요...ㅠ.ㅠ

아래 이미지들은 한빛미디어의 Hadoop 완벽 가이드에 실린 내용을 재정리 한 것입니다.
=======================================================












블로그 이미지

마즈다

이미 마흔을 넘어섰지만 아직도 꿈을 좇고 있습니다. 그래서 그 꿈에 다가가기 위한 단편들을 하나 둘 씩 모아가고 있지요. 이 곳에 그 단편들이 모일 겁니다...^^

최초 작성일 : 2013/06/05 13:02 


HDFS (하둡 분산 파일 시스템)은 다음의 사항을 고려하여 설계되었다.


1. 대용량의 파일 : 수백 Mb에서 수백 Tb 혹은 Pb급 이상을 대상으로 함

2. WORM (Write Once Read Many) : 커다란 원본으로부터 데이터 셋을 만들어 그
              데이터 셋을 분석한다. 그러기 위해서는 전체 파일을 읽어야 한다.

3. 범용 하드웨어 : 고성능의 신뢰도 높은 서버가 아닌 다수의 범용 머신으로 구성된
              클러스터에서의 실행을 고려하였다.


========================================================
위 내용에 대해서는 여러 번 반복되었기에 간단하게 적고 넘어간다.

그러나 간혹 'Big'이라는 용어의 함정에 빠져 잊기 쉬운 부분들이 있다.
나 역시 BigData 관련 공부를 시작하면서 문서는 물론이거니와 이미지나 음악 및 동여상 등
개인이 소유하게되는 파일 역시 수적으로나 양적으로 점점 더 커지고 있는 상황에서
이러한 분산 파일 시스템을 개인용으로 만들어보면 어떨까 하는 생각을 해봤다.
기존의 RAID 시스템에 비해 오히려 하드웨어적으로 더 저렴하게 구현을 할 수 있을 것
같았다.

그러나 다음의 내용을 확인하고서는 조용히 생각을 접었다.
(하지만 아직 가능성은 모색 중이다.)
========================================================

HDFS가 적당하지 않은 분야

1. 빠른 데이터 액세스 / 빠른 응답 시간이 필요한 경우
 - 앞서도 나왔지만 HDFS는 대용량의 파일을 처리하기 위한 시스템이고 대체로 배치성
   작업에 적합하다.

2. 많은 수의 작은 파일들에 대한 처리
 - 네임노드는 파일들의 메타 데이터를 관리하며 이를 디스크가 아닌 메모리에 저장하여 처리한다.
   따라서 파일의 수가 많아질수록 메모리에 대한 부하가 가중되며 아직도 디스크에 비해
   가격이 비싼 메모리 관리에 문제가 발생할 수 있으며 파일의 수가 비약적으로 많아질
   경우 하드웨어에서 지원 가능한 메모리 용량을 넘어설 수도 있는 문제이다.

3. 다중 writer나 임의의 파일 수정
 - 역시 앞서 나왔듯이 HDFS는 WORM 상태를 고려하여 설계되었다.
   예를 들어 HDFS는 파일을 블럭으로 나눈 후 서로 다른 노드에 저장을 한다.
   뿐만 아니라 필요한 수 만큼의 복제본을 생성한다. 만일 이 파일에 대한 수정을
   허용한다면 파일의 수정이 생길 때마다 각 노드이 해당 블럭을 찾고 수정된 위치에
   변경 사항을 반영해야 하며 이러한 작업을 복제본에까지 적용해야 한다(혹은 변경된
   블럭들을 다시 복제해야 한다).
   얼마나 비효울적인가...

***
즉, 수십Gb에 달하는 대용량의 파일의 존재할지라도 아직도 개인의 디스크에는
많은 수의 작은 파일들이 대부분의 저장공간을 차지하고 있을 것이다.
또한 파일 성격에 따라 수시로 변경되는 파일도 많을뿐더러 이러한 파일들을
빨리 읽혀야 한다.

하지만 동영상을 중심으로하는 대용량 파일들만을 고래해보았을 때는HDFS의
기본 설계에 얼추 들어맞는 것 같다. 대용량이면서 디스크에 저장된 이후 수정이 일어날
일도 없다. 다만 빠른 접근만이 문제가 될 것이다.

블로그 이미지

마즈다

이미 마흔을 넘어섰지만 아직도 꿈을 좇고 있습니다. 그래서 그 꿈에 다가가기 위한 단편들을 하나 둘 씩 모아가고 있지요. 이 곳에 그 단편들이 모일 겁니다...^^

최초 작성일 : 2013/05/30 15:24 


맵 리듀스 개요

맵 : 
- 원시 데이터를 key-value 쌍의 중간 파일로 만든다.
- 입력데이터가 있는 노드에서 맵 함수가 수행되는 것이 가장 좋다 (지역성 최적화)
- 맵 함수에 전달되는 입력 데이터는 라인 offset을 키로, 해당 라인 내용을 value로 하는 형태로 구성된다.
- 맵 함수는 이 입력값들로부터 필요로 하는 key와 value를 추출한다.
- 이 과정에서 잘못된 레코드를 제거하는 기능도 수행한다.
- 맵 task의 실행 결과는 HDFS가 아닌 로컬 디스크에 저장된다. (HDFS와 로컬 디스크의 개념을 명확히 구분하자)
  이유는 맵의 결과물은 단지 리듀스 함수로 전달하기 위한 중간 결과물일 뿐이며 모든 잡이 완료되면
  버려도 되는 데이터이기 때문이다.

리듀스 : 
- 각 맵 task들의 결과물들을 입력으로 받아 최종 결과물을 생성한다.
- 각 노드에 있는 맵 task의 결과물들을 입력으로 받으므로 지역성 최적화의 영향이 없다.
- 리듀스의 결과물은 안정성을 위하여 HDFS에 저장된다.


셔플 : 
- 맵 task의 결과물을 리듀스 task로 보내기 전의 중간 가공 단계
- key에 대한 정렬이나 그룹화 및 파티셔닝 작업이 이루어진다.
- 정렬은 말 그대로 정렬이며 그룹화는 같은 key로 묶는 것, 그리고 파티셔닝은 리듀스 task가 2개 이상인 경우
  결과물을 각각의 리듀스 task에 분배하기 위해 특정 기준으로 쪼개는 작업이다.
- 때때로 셔플 작업이 없을 수도 있으며 이런 경우에는 리듀스 task도 없는 맵 task만으로 이루어진
  job이 수행된다. 또한 이 상태에서는 맵 task의 결과가 HDFS에 저장된다.

컴바이너 : 
- 맵 task의 결과물을 네트워크를 통해 리듀스 task로 이동시키는 과정을 최적화하기 위한 방법 중 하나
- 같은 key를 가진 value들을 리스트로 묶어 새로운 key-value쌍을 만든다.
  즉 {key1, value1}, {key1, value2}를 {key1, list(value1, value2)}의 형태로 만드는 것이다.
- 주로 연합 연산(합계, 카운팅, 최대값 등)에 사용된다.
- 컴바이너를 사용하게 되면 맵 task 결과물의 사이즈를 줄일 수 있다. 즉 네트워크의 트래픽량을 줄일 수
  있게 되는 것이다.

블로그 이미지

마즈다

이미 마흔을 넘어섰지만 아직도 꿈을 좇고 있습니다. 그래서 그 꿈에 다가가기 위한 단편들을 하나 둘 씩 모아가고 있지요. 이 곳에 그 단편들이 모일 겁니다...^^

최초 작성일 : 2013/05/21 13:01 


이 글에 적는 내용은 2013년 1월 3일 길벗사에서 간행한

'대용량 데이터 분석 및 처리를 위한 hadoop nosql' (서상원김재홍박윤성이준섭명재석 저)의
전자책 버전에서 발췌한 내용입니다.
중요한 내용이 있을 때마다 이러한 정보들을 정리해 올리도록 하겠습니다.

==========================================================

1. 하둡은 크게 HDFS와 MapReduce의 두 부분으로 구성되어있으며
   HDFS는 네임노드, 세컨더리 네임노드, 데이터노드를 동작시키고
   MapReduce는 잡 트래커와 태스크 트래커를 동작시킨다.

   네임노드와 잡 트래커는 마스터 노드(네임노드)에서, 데이터노드와 태스크 트래커는
   슬레이브 노드(데이터노드)에서 동작한다.

   동작하는 프로세스는 jps 명령으로 확인할 수 있다.

2. 하둡 분산 파일 시스템(HDFS)는 다수의 작은 파일보다는 소수의 대용량 파일을 다루는데
   적합하도록 설계된 파일 시스템이다.

3. 하둡에서 각 노드들의 프로세스를 기동하기 위해 SSH를 사용한다. 수 백, 수 천대로
   구성된 클러스터의 경우 일일히 각각의 서버에서 프로세스를 기동할 수 없기 때문에
   네임노드에서 기동을 시키면 클러스터 내의 모든 슬레이브 노드에서 프로세스가 기동이
   되는 데, 이 때 SSH를 사용하여 명령을 전달하게 된다.

4. 하둡이 파일을 블록으로 나누어 저장하는 이유
   - 파일 시스템에 따라 한 개의 파일이 가질 수 있는 최대 크기에는 제약이 있다.
      블록으로 나누어 저장하면 이러한 제약을 피해서 대용량 파일을 저장할 수 있다.
   - 대용량 파일을 하나로 저장할 경우 파일을 읽기 위해 메모리로 로드하는데 부하가
      걸린다. (Gb급 로그 파일을 편집기로 열지 못하는 경우를 생각해보자) 블록으로
      나루면 이러한 문제가 해소된다.

블로그 이미지

마즈다

이미 마흔을 넘어섰지만 아직도 꿈을 좇고 있습니다. 그래서 그 꿈에 다가가기 위한 단편들을 하나 둘 씩 모아가고 있지요. 이 곳에 그 단편들이 모일 겁니다...^^