Cluster : The Beginning - Raspberry pi에 Apache Kafka 설치하기


이전 작업으로 일단 기본적인 출발은 마무리가 되었다.
하지만 EMQ를 설치한 내용에서 언급했듯이 MQTT는 경량화 프로토콜로 주로 IoT에 특화되어 있다고 볼 수 있어
(Facebook Messanger에서 MQTT를 사용한다고 하는데 현재형인지 또 어떤 영역에 어떻게 사용하는지는 잘
모르겠다) 아직은 절반의 성공일 뿐이다.


센서 데이터 분석을 위한 환경 뿐만 아니라 일반적인 데이터 분석에 대한 환경을 갖추기 위해서는 MQTT라는 진입점
만으로는 부족한 것이다. 그래서 일반 데이터를 수집하는 부분은 예전에 한 번 시도를 해보았던 Apache Kafka를
이용하기로 했다. Kafka를 이용해 수집할 데이터도 예전과 마찬가지로 트위터의 데이터가 될 것이다. 다만 클라이언트는
예전에는 Node.js를 이용하여 구현했으나 이번에는 다른 방식을 찾아볼 생각이다.


이번 포스팅은 바로 이 Apache Kafka를 라즈베리파이에 설치하고 구동하는 과정을 정리해보겠다.
사실 2016년도에 정리한 내용의 축약 버전이나 다름없어 마지막 정리에 2016년에 포스팅한 내용을 모두
링크하였으니 참고하시길 바란다.


개요 - Kafka는 MQTT와 뭐가 다른가?


일단 Kafka 역시 Message Queue기반의 시스템이다. 용어의 차이는 있지만 대체로 구성이 비슷하다.




아주 심플하고 직관적인(하지만 정확하진 않은…-.-) 비유를 들자면 다른 Message Queue 시스템을 퀵서비스라고
한다면 Kafka는 택배라고 할 수 있을 것이다. 퀵서비스는 작은 물건이나 서류를 송신자로부터 수신자 에게 직접 전달을
해주지만 택배는 큰 덩치의 물건들을 물류창고에 집하했다가 다시 배송을 한다. 하지만 이 것은 어디까지나 간단한 비유고
자세한 차이점은 아래 블로그에 잘 정리가 되어있다.

http://epicdevs.com/17


일단 Kafka는 용량이 큰 데이터 전송에 유리하고 클러스터를 통해 데이터를 ‘복제’해둘 수 있으며, Message Queue가
broker에서 subscriber로 topic을 push해주는 반면 kafka는 consumer가 필요할 때 broker로부터 pull 방식으로
가져다 쓸 수 있다는 차이 정도만 알아두면 될 것 같다.


zookeeper 설치, 설정, 실행


개요에서 설명한 것과 같이 Kafka의 경우 클러스터를 구성하여 분산처리를 할 수 있으며 전송되는 데이터를 여러 노드에
복제해놓을 수도 있다. 하지만 이러한 분산 처리를 하기 위해서는 zookeeper라는 분산 코디네이터의 도움을 받아야
한다. 즉, Kafka를 사용하기 위해서는 zookeeper를 먼저 설치해야 한다는 뜻이다.


라즈베리파이에 zookeeper 설치는 매우 간단해서 그냥 바이너리 배포판을 다운로드 받아 적당한 위치에 압축을
풀고 환경에 맞게 설정한 후 실행을 하면 된다…-.-


나는 일단 3.4.10 버전을 받아서 /opt/zookeeper에 압축을 풀었다.
설정은 딱 3가지만 하면 된다.

#데이터를 저장할 디렉토리를 설정한다.
dataDir=/var/lib/zookeeper

#간단하게 기본 설정 파일에서 주석만 풀어주면 된다. 
#주석 처리되어있으면 서버간 통신 때 connection refused가 발생한다.
maxClientCnxns=60

#zookeeper의 클러스터는 별도로 앙상블이라고 불리우는데 앙상블을 구성할 서버 주소를 적어준다.
server.0=172.30.1.54:2888:3888
server.1=172.30.1.13:2888:3888
server.2=172.30.1.42:2888:3888


zookeeper 앙상블이 정상적으로 실행되기 위해서는 dataDir에 지정된 경로에 myid 파일이 필요하며 이 파일에는
3번째 서버 설정에서 정의된 서버 ID가 적혀있어야 한다. 위 설정을 기준으로 보자면 server.0 서버에는 0이,
server.1 서버에는 1이, server.2 서버에는 2가 적혀 있어야 한다.


앙상블(클러스터)을 구성하는 모든 서버에 동일한 설정을 해주고 나서 각 서버에서 아래와 같이 zookeeper 서버를
실행해준다.

$ $ZOOKEEPER_HOME/bin/zkServer.sh start


한가지 주의할 사항은 클라이언트의 요청을 처리하는데 있어서 leader 역할을 하는 한 대의 노드에서만 읽기와 쓰기가
모두 가능하다. follower에서는 오직 읽기만 처리 가능하며, 만일 쓰기 요청이 오면 각 follower 노드들은 그 요청을
leader 노드에 위임하게 된다.


어느 서버가 leader고 어느 서버가 follower인지는 zookeeper에서 확인 가능한데 다음 명령어로 확인 가능하다. 
아래는 현재 노드가 follower임을 보여준다.




zookeeper에 대한 기본적인 내용은 이 것이 전부다. 하지만 분산 시스템을 관리한다는 본연의 임무를 생각해본다면
zookeeper에 대해 알아야 할 내용은 상당히 많다. 또한 zookeeper API를 이용하면 zookeeper를 통해 관리되는
분산 시스템을 직접 만들 수도 있다. 한마디로 zookeeper에 대한 것만 공부하자고 해도 상당히 많은 시간이 필요하므로
여기서는 이정도로 마무리 하겠다.


Kafka 설치


지금껏 진행해온 다른 시스템 설치와 마찬가지로 설치 자체는 매우 간단하다. 바이너리 배포본을 다운로드 한 후
압축을 풀고, 설정하고, 실행하면 된다.


나의 경우 일단 Kafka는 2.11-1 버전을 다운로드 받았고 /opt/kafka에 압축을 풀었다.




Kafka 설정


Kafka의 설정 파일 위치는 다음과 같다. 나는 /opt/kafka에 설치를 했으니 /opt/kafka/config 아래에 있는
server.properties를 수정하면 된다.




이전과 마찬가지로 반드시 설정해야 할 내용만 정리해보자.

#앞서 zookeeper 설정에서 설명한 서버 아이디를 적어준다. 여기는 172.30.1.54 서버이므로
#server.0=172.30.1.54:2888:3888 설정을 참고하여 0으로 설정하였다.
broker.id=0

#로그를 기록할 경로를 지정한다.
log.dirs=/var/lib/kafka-logs

#topic을 저장할 파티션을 몇개로 할 지 지정한다. 서버가 3대이니 일단 3으로 지정해보았다.
#이렇게 설정하면 하나의 데이터 파일이 3개로 쪼개져서 저장이 된다.데이터 파일이 Topic으로 들어오는 데이터가       #3영개의 영역으로 나뉘어서 저장이 된다.
#하지만 partition이란 하나의 Topic을 몇 개로 나눌지를 결정하는 것이지 #물리적 서버의 댓수와는 상관이 없다. num.partitions=3 #데이터 파일의 복제본을 몇개나 가지고 있을지 설정한다. 3으로 설정했으니 3개의 복제본이 존재하게 된다. offsets.topic.replication.factor=3 #클러스터 관리를 위한 zookeeper 서버 목록을 적는다. zookeeper 설정에서는 IP 주소로 설정했는데 #여기서는 host 이름으로 설정하여 일관성이 없는 설정이 되긴 했지만...-.- #각 서버는 다음과 같이 매핑되니 참고하시길 바란다. #rpi1=172.30.1.54, rpi2=172.30.1.13, rpi3=172.30.1.42 zookeeper.connect=rpi1:2181,rpi2:2181,rpi3:2181


위 내용만 설정하면 kafka 서버를 실행할 수 있다.


마지막으로 num.partitions과 offsets.topic.replication.factor 설정이 어떻게 반영되는지에 대해 아래와 같이
그림으로 간단하게 표현을 할 수 있다.




위 그림을 설명하자면 다음과 같은 구조의 경우라 볼 수 있다.

  • 3개의 노드 
  • 3개의 파티션(num.partitions=3)
  • 3개의 복제본(offsets.topic.replication.factor=3)


여기서 주의해서 볼 것은 leader와 follower로 항상 leader에만 쓰기가 가능하며 leader에 데이터가 기록되면
기록된 데이터가 follower로 복제된다는 것이다.


보다 상세한 내용은 좀 더 공부를 한 후 다시 정리해보도록 하겠다.


실행


실행은 /opt/kafka/bin경로로 이동하여 다음과 같이 입력하면 된다.

$ ./kafka-server-start.sh ../config/server.properties


만일 JMX를 이용한 모니터링 도구를 이용하고자 한다면 다음과 같이 실행한다.

$ env JMX_PORT=9000 ./kafka-server-start.sh ../config/server.properties


이렇게 실행을 한 후 아래 2개의 모니터링 도구를 사용할 수 있다.

Kafka Offset Monitor : http://quantifind.github.io/KafkaOffsetMonitor/

Kafka Manager : https://github.com/yahoo/kafka-manager


주의 사항


2016년도에 Kafka를 설치할 때는 맥미니에 설치하였지만 이번에는 라즈베리파이에 설치를 하였다.
이 차이는 결코 작은 차이가 아니다. 하드웨어 사양으로 인해 라즈베리파이에서는 사용할 수 있는 자원이
매우 한정적인 것이다. 실제로 라즈베리파이에서 kafka를 실행했을 때 잦은 빈도로 메모리 부족 현상이
발생을 하였다. 따라 일단 라즈베리파이에서 안정적으로 실행을 하기 위해서는 kafka-server-start.sh 파일의
다음 부분을 찾아 Xmx와 Xms를 적절히 수정해주어야 한다. 기본 값은 1G이다.

export KAFKA_HEAP_OPTS="-Xmx512M -Xms512M"



정리


사실 라즈베리파이에서 설치 및 구동을 한 것 외에는 2016년도에 정리했던 내용과 별 다른 점이 없다.
다시 정리한다는 것이 중복된 작업일 수도 있으나 복습한다는 의미로 한 번 더 정리해보았다.
그런 의미에서 가장 중요한 부분은 ‘주의 사항’이 아닐까 싶다.


가장 기초적인 설치,설정,실행 부분만 짚고 넘어갔으니 2016년도의 Kafka 관련 모든 글을 링크하면서
이번 포스팅을 마칠까 한다.











블로그 이미지

마즈다

이미 마흔을 넘어섰지만 아직도 꿈을 좇고 있습니다. 그래서 그 꿈에 다가가기 위한 단편들을 하나 둘 씩 모아가고 있지요. 이 곳에 그 단편들이 모일 겁니다...^^






Cluster : The Beginning - Raspberry pi에 HAProxy 설치하기


지난시간 까지 2대의 라즈베리파이에 EMQ를 클러스터링하여 설치하는 작업을 진행하였다.
그런데 한 가지 문제가 생겼다. 일반 가정집에서 공유기를 통해 접속을 하다보니 2대 중 한대로만 접속이 이루어진다는
점이다. 혹시나 해서 공유기의 트래픽 관리에 외부 포트는 동일하게, 내부 서버는 서로 다르게 설정을 해보아도 역시나
가장 마지막에 등록된 정보로만 통신이 이루어졌다.


이렇게 되면 세션은 공유가 되지만 만약 외부에서 접속되는 한 대의 서버가 다운되면 전체 시스템을 사용할 수 없게
되는 것이다. 결국 접속을 분산시킬 필요가 생겼다. 그리고 그렇게 해서 HAProxy라는 솔루션을 찾아내었다.


더불어 애초에 EMQ 2대 Apache Kafka 3대로 클러스터를 구성할 생각이었는데 부랴부랴 HAProxy를 위한 
1대의 라즈베리파이를 추가로 구입하여 총 6대의 클러스터가 되었다.



Clustering과 Load Balancing


일반적으로 성능 향상을 위해 여러대의 서버에 동일한 구성을 하여 함께 운영하는 경우가 많다.
그리고 이렇게 구성하는 경우 Clustering이라든가 Load Balancing이라는 용어가 많이 쓰이는데 과연 
이 둘의 차이는 무엇일까?


가장 단순하게 표현하자면 Clustering은 여러대의 컴퓨터 그룹이 같은 정보를 가지고 함께 일을 하는 것이고
Load Balancing이라는 것은 각각의 컴퓨터가 공평하게 일할 수 있도록 일감을 나누어 주는 것이라고 생각하면
되겠다. 이렇게 정의할 때 Clustering은 같은 정보에 방점이 찍히고 Load Balancing은 나누어 주는
방점이 찍히게 된다. 즉, Clustering은 그룹을 지어 함께 일을 하는 서버간의 관계에 대한 개념이고 Load Balancing은
클라이언트에서 서버로 접속하는 과정에서의 처리에 대한 개념이라고 보면 되겠다.

 



여러 대의 컴퓨터를 대상으로 한다는 것 외에는 서로 다른 개념이고 종속관계를 나눌 수는 없지만
최근 Clustering을 지원하는 솔루션들은 내부적으로 Load Balancing을 함께 지원하는 경향이 있다. 하지만 
Load Balancing을 위한 솔루션이 Clustering을 지원하는 경우는 거의 없다.


예를 들어 1대의 서버로 서비스를 제공할 경우 그 서버가 다운되면 서비스 자체를 제공하지 못하므로 2대의 서버에 
각각 Tomcat을 설치하여 동일한 서비스를 제공하고 싶다고 하자. 각각을 Tomcat1, Tomcat2라고 했을 때
Tomcat1로만 또는 Tomcat2로만 접속이 가능하다면 2대의 서버를 구성한 의미가 없어진다. 그래서 서버 앞단에
Load Balancing을 위한 장비를 두고 적절한 알고리즘을 통해 각각의 클라이언트들이 Tomcat1과 Tomcat2에
적절하게 분산되어 접속하도록 조정을 해주는 것이다.


그런데 Load Balancing만 하는 경우에는 문제가 발생할 수 있다. 아래 그림을 보면 client1과 client2가 Load 
Balancer를 통해 각각 Tomcat1과 Tomcat2로 적절하게 나누어 요청을 수행하고 있다. client1은 Tomcat1에
문서 파일인 B를 업로드 했고 client2는 Tomcat2에 그림파일인 A를 업로드 했다. 그런데 Tomcat2 서버에
장애가 발생하여 접속이 불가능하게 되면 Load Balancer는 이후 모든 접속을 정상 작동 중인 Tomcat1로 보낸다.
이 때 client2의 입장에서는 Load Balancer 이후의 구조에 대해서는 알지 못하므로 분명 자신은 그림 파일 A를
서버에 업로드 했는데 그 업로드한 파일을 찾을 수 없게 된다.




이러한 문제 때문에 보통 Load Balancing을 하면 Clustering도 함께 고려를 해야 한다. 물론 간단하게 Tomcat은
이중화를 하되 DB는 1대만 설치하여 중요 정보만 공유하는 것도 고려해볼 수 있을 것이다.


결론적으로 Clustering과 Load Balancing은 다음과 같이 구분지어 볼 수 있을 것이다.



HAProxy


EMQ의 경우 clustering을 통해 상태 공유는 되지만 클라이언트와 접속하는 지점에서의 load balancing은 별도의
작업을 해주어야 한다. 일반적으로 널리 알려진 load balancing 장비는 L4 Switch라는 하드웨어 장비가 있다.
하지만 개인이 그런 비싼 네트워크 장비를 사용할 수도 없을뿐더러 사용할 필요도 없다…우도살계인 격이다.


그러다가 발견한 것이 HAProxy라는 솔루션이다. S/W적으로 load balancing을 지원해주는 솔루션이었다.
홈페이지는 다음과 같다.

http://www.haproxy.org


사실 HAProxy는 전체 구성요소에서 고려하고 있지 않던 부분이라 아주 기초적인 부분만 확인했다.
자세한 내용은 HAProxy로 검색하면 많은 정보를 얻을 수 있으니 여기서는 간단하게 설치와 설정 그리고
모니터링 웹 콘솔에 대해서만 살펴보겠다.


설치


역시나 소스를 컴파일하는 설치 방법이 많이 검색되는데 나는 심플하게 apt-get으로 설치하였다.


$sudo apt-get install haproxy 


홈페이지에는 현재 최종 릴리즈 버전이 1.9인데 apt-get으로 설치하니 1.7.5-2 버전이 설치되었다.
이렇게 설치하고 나면 /etc아래에 haproxy 디렉토리가 생성되고 그 아래 설정파일인 haproxy.cfg가
위치한다.


설정


HAProxy의 설정은 몇개의 영역으로 구분되는데 간단하게 살펴보면 다음과 같다.


  • global : 이름 그대로 전역 변수에 대한 설정으로 로그, 상태, 소유자 및 소유 그룹 SSL 관련 설정들이 있다.
  • default : 아래 내오는 세부 설정에서 명시적으로 추가하지 않은 설정은 이 default 설정을 따른다.
  • frontend : 클라이언트가 접속하는 것과 관련된 설정이다. 기본적으로 외부에서 접속할 포트 정도 지정한다.
  • backend : frontend와 짝을 지어 설정하며 frontend 설정을 통해 접속한 클라이언트가 실제 요청을 보낼 서버를 지정한다.
  • listener : frontend와 backend를 한번에 설정할 때 사용한다. 주로 TCP 연결에 사용한다.


각 설정의 뒤에는 설정을 대표하는 이름을 적게 된다. 그 이름은 모니터링 화면에 표시하여 구분할 수 있도록 한다.
앞서 EMQ에서와 마찬가지로 내가 설정한 부분만 간단하게 살펴보자. global과 default에서는 유의미한 수정이
없으므로 frontend와 backend 그리고 listener 설정만 보도록 하겠다.


#frontend와 backend 설정은 EMQ의 모니터링을 위한 웹 접속 설정이다.
#외부에서 8080포트로 접속을 하게 되면 backend에 설정된 172.30.1.9 서버와 172.30.1.25 서버의
#18083 포트로 접속을 하게 되는데 이 때 클라이언트를 두 서버로 분배하는 알고리즘은 roundrobin이다.
#mode는 프로토콜을 설정하는 항목인데 여기서는 지정을 안했기 때문에 default 설정에 있는 http 접속을
#처리한다.
frontend http-in 
	bind    *:8080
	default_backend servers

backend servers
	balance roundrobin
	server server1 172.30.1.9:18083 maxconn 2048
	server server2 172.30.1.25:18083 maxconn 2048

#아래 설정은 클라이언트로부터 EMQ boker에 접속하기 위한 설정이다.
#HAProxy를 사용하게 된 것도 바로 이 설정이 필요했기 때문이다.
#외부에서 1883 포트로 접속을 하게 되면 두 서버의 1883 포트로 접속이 분배되며 역시
#분배 알고리즘은 roundrobin이다. 프로토콜을 설정하는 mode는 tcp로 설정한다.
listen  tcp-in
        balance roundrobin
        bind    *:1883
        log     global
        mode    tcp
        option  tcplog
        server  mtqqserver1     172.30.1.25:1883
        server  mtqqserver2     172.30.1.9:1883

#아래 설정은 위 설정과 동일한데 개인적으로 위 내용은 모바일이나 IoT로부터의 접속에 사용하기 위해
#설정하였고 아래 내용은 kafka에서 EMQ broker의 메시지를 subscribe하기 위해 따로 설정하였다.
listen  mqtt-kafka
        bind    *:1884
        mode    tcp
        option  clitcpka
        timeout client 3h
        timeout server 3h
        option  tcplog
        balance leastconn
        server  mtqqserver1     172.30.1.25:1883        check
        server  mtqqserver2     172.30.1.9:1883         check

#이 설정은 HAProxt의 모니터링을 위한 웹 콘솔에 접근하기 위한 설정이다.
#이 설정의 이름은 stats로 정해져 있으며 uri의 경우 주소:포트 뒤에 따라올 문자열로
#임의로 설정하면 된다.
listen  stats
        bind    *:8081
        mode    http
        stats   enable
        stats   uri     /haproxy?stats
        stats   auth    guest:guest
        stats   refresh 5s


기본적으로 이정도만 해도 원하는 바는 얻을 수 있다. 좀 더 세밀한 설정을 위해서는 보다 많은 내용을 추가해야 하지만
앞서 말했듯이 HAProxy는 가볍게 지나가도록 하겠다.


실행


실행과 종료는 간단하게 다음과 같이 하면 된다.


$/etc/init.d/haproxy start
$/etc/init.d/haproxy stop


설정이 바뀌면 다음과 같이 재시작 한다.


/etc/init.d/haproxy restart

모니터링


위 설정에서 이미 모니터링을 위한 웹 콘솔에 대한 설정은 마친 상태이다. 설정에 적힌 포트로 접속만 하면 된다.
설정에 따라 브라우저에 아래 주소를 입력하면 모니터링 화면이 뜬다.


http://localhost:8081/haproxy?stats




테스트


별다른 스크린샷은 올리지 않겠지만 지난 번 포스팅에 설명했던 MQTT 클라이언트로 connect와 disconnect를
반복해보면 EMQ 모니터링 화면의 Clients 메뉴에 한 번은 172.30.1.9 서버로, 한 번은 172.30.1.25 서버로
번갈아 접속되는 모습을 볼 수 있다.


정리


이렇게 해서 갑작스럽게 설치하게 된 HAProxy에 대한 정리를 마친다.
물론 집에서 개인 프로젝트용으로 구성하는 서버에 뭘 이런 것까지 설치해야 하나 싶지만 기왕에 다수의 장비를
구성하게 된 김에 곁들여서 한 번 설치를 해보았고 나름 작동하는 모습을 보니 재미는 있다^^;;;

다음 포스팅에서는 나머지 3대의 라즈베리파이에 Apache Kafka를 설치하고 설정 및 실행시키는 과정을
정리해보도록 하겠다.







블로그 이미지

마즈다

이미 마흔을 넘어섰지만 아직도 꿈을 좇고 있습니다. 그래서 그 꿈에 다가가기 위한 단편들을 하나 둘 씩 모아가고 있지요. 이 곳에 그 단편들이 모일 겁니다...^^







Cluster : The Beginning - Raspberry pi에 MQTT (EMQ) 설치하기


클러스터의 입구 - MQTT


현재 근무하는 곳에서도 ActiveMQ를 쓰고 있고 RabbitMQ도 많이 들어보았지만 정작 MQTT라는 프로토콜에
대해서는 제대로 모르고 있었다. 그저 위에 언급한 시스템들이 message queue를 구현한 것이라고 막연히 알고
있었을 뿐 그 내부에 대해서는 무지했던 것이다(물론 무지한 것은 지금도 마찬가지다…-.-)


그러다가 이번에 IoT와 관련하여 아두이노로 만든 센서 기반의 작은 소품들로부터도 정보를 받아보겠다고 계획하면서
MQTT를 알게 되었다. 이렇게 해서 또 한 가지 배우게 된 것이다…^^


MQTT(Message Queuing Telemetry Transport)란?


일단 간단하게 정의 내려 보면 “Message Queue 기반의 원격 전송 프로토콜” 정도로 말할 수 있겠다.
그럼 Message Queue란 무엇이냐?…라고 계속 묻고 들어가면 끝이 없으니 자세한 내용은 링크로 대신한다…-.-



네이버 지식 백과 : [http://terms.naver.com/entry.nhn?docId=3386832&cid=58369&categoryId=58369]

Wikipedia : [https://en.wikipedia.org/wiki/MQTT]


중요한 것은 이 프로토콜이 경량화(저전력, 낮은 대역폭 등) 된 프로토콜이며 주로 M2M(Machine-to-Machine)
통신에 특화되어 IoT와 밀접하게 관련되어 있다는 것이다.


브로커 선정


이 MQTT 프로토콜을 이용한 통신 구현체는 보통 Broker라 부른다.
이 Broker라는 이름은 Message Queue의 일반적인 통신 방식으로부터 비롯되었는데 양 끝단에 publisher와
subscriber가 있고, 이 publisher와 subscriber 사이에서 message를 ‘중개’해주는 역할을 한다고 하여 이런
이름이 붙게 되었다(아래 이미지는 네이버 지식 백과에서 가져왔다). 



이러한 MQTT broker는 앞서 언급한 ActiveMQ와 RabbitMQ외에도 mosquitto, moquette, mosca, HiveMQ 등
상당히 많다. 아래 링크는 이러한 브로커들을 비교해놓은 자료이다.


https://github.com/mqtt/mqtt.github.io/wiki/server-support


일단 라즈베리파이에 가장 흔히 쓰이는 것이 mosquitto인데 나는 2대로 클러스터를 만들어보기 위해 기왕이면 
클러스터링을 기본적으로 지원하는 브로커를 찾게 되었고 그 후보자로 HiveMQ와 EMQ가 선정되었다. 
하지만 HiveMQ가 상용이어서 오픈소스 기반인 EMQ로 정하게 되었다. 그리고 위 링크를 보아서도 알 수 있듯이 
EMQ는 체크 대상 전 항목이 구현되어 있어 도표상으로만 보자면 거의 상용 제품에 맞먹는 기능을 가지고 있다.


설치


아재의 잡설이겠지만 예전에는 오픈소스라는 것들이 웬만하면 소스로 내려받아 컴파일을 해야 사용이 가능했는데 
요즘은 대체로 바이너리로 받아도 별 무리 없이 실행이 된다. 그런데 나는 EMQ를 라즈베리파이에 설치할 계획이었고
EMQ 홈페이지에는 안타깝게도 라즈베리파이용 배포판은 없었다. 리눅스 계열의 배포판 중 하나를 받으면 되겠지만
어떤 것을 받아야 할지 대략 난감이었다.


결국 Github에 있는 소스를 내려받아 컴파일을 하게 되었다.


그러나 나에게는 남들에게 없는 오묘한 재주가 있었으니…
바로 너무 쉽게 설명된 것을 너무 어렵게 진행한다는 것이다…-.-


우선 소스의 다운로드와 컴파일은 다음의 과정을 거쳐 진행하면 된다.

$sudo apt-get update
$sudo apt-get install wget
$sudo apt-get install libssl-dev
$sudo apt-get install ncurses-dev
 
#ERLang 설치
$wget http://www.erlang.org/download/otp_src_19.2.tar.gz

$tar -xzvf otp_src_19.2.tar.gz
$cd otp_src_19.2/
$./configure
$make -j 4 
$sudo make install
$cd ..
$rm otp_src_19.2.tar.gz
$sudo rm -R otp_src_19.2/

#EMQ 설치 및 실행
$git clone https://github.com/emqtt/emq-relx.git
$cd emq-relx && make
$cd _rel/emqttd && ./bin/emqttd console


일단 모든 설치는 문제 없이 잘 끝났다.
그런데 정작 emq를 make하는 과정에서 아래와 같은 오류가 발생을 하였다.



원인은 erlang 설치에 있었으며 라즈베리 파이에 설치하기 위해서는 보다 상위 버전이 필요했었던 듯하다.
일단 erlang의 configure 실행 후 아래 화면과 같이 APPLICATIONS DISABLED에 많은 항목이 보이면
문제가 있는 것이라 보면 된다.



이 문제는 erlang의 버전을 20.1 이상으로 설치하면 해결된다.
이렇게 우여곡절 끝에 emq를 설치하였다. 나는 클러스터링을 원했으므로 2대의 라즈베리파이에 각각 설치를 하였다.


설정


이제 설치를 마쳤으니 설정을 해야 한다.
내가 설치한 버전의 emq는 빌드를 하고 나면 git으로부터 복제(위 설치 명령어 참조)한 경로 아래에 _rel이라는
디렉토리가 생기고 그 아래 빌드된 emq가 설치된다. 즉, git clone을 통해 만들어진 디렉토리가 emq-relx라면
emq-relx/_rel/emqttd 아래에 바이너리가 설치된다. 그리고 설정파일은 emq-relx/_rel/emqttd/etc
아래에 있는 emq.conf 파일이다.



설정파일이 상당히 긴데 사실상 변경할 부분은 그리 많지 않다. 내가 수정한 부분만 적는다.

...

#클러스터를 명시적으로 표시하겠다는 의미다.
cluster.discovery = static

...

#클러스터를 명시적으로 표시하겠다고 했으니 명시적으로 표시한다…-.-
cluster.static.seeds = rpi4@172.30.1.25,rpi5@172.30.1.9

...

#현재 노드를 적는다. 물론 172.30.1.25 라즈베리파이에는 rpi4@172.30.1.25라고 적는다.
node.name = rpi5@172.30.1.9

#쿠키 이름을 적는데 이 쿠키 이름을 적지 않거나 쿠키 이름이 일치하지 않으면 제대로 실행이 되지 않는다.
node.cookie = RPI

...

#웹 모니터링 콘솔을 접속하기 위한 포트 설정. 기본 값을 수정하지는 않았지만 중요한 내용이므로 적음
#아래 설정은 EMQ에서 지원하는 REST API 접속을 위해 필요한 포트이다. 모니터링 콘솔 접속 포트는 18083이다!!
listener.api.mgmt = 0.0.0.0:8080

...

node.name을 제외하고는 모두 동일하게 해서 다른 한 서버도 마저 설정을 한다.


실행


이제 실행을 하면 된다. 서비스로 실행을 하는 경우 아래 링크의 맨 마지막 부분을 보면 설명이 나와있다.

http://emqtt.io/docs/v2/install.html


나는 클러스터를 구성하기로 했으니 아래 링크를 따라 진행을 했으나 오류가 발생을 하였다.

http://emqtt.io/docs/v2/cluster.html


그런데 그냥 양쪽에서 아래와 같이 콘솔 모드로 실행을 했더니 자동으로 클러스터링이 이루어졌다.

$cd _rel/emqttd && ./bin/emqttd console


클러스터링이 된 것은 다음과 같이 확인하면 된다.

$sudo ./bin/emqttd_ctl cluster status
Cluster status: [{running_nodes,['rpi4@172.30.1.25','rpi5@172.30.1.9']}]

이렇게 실행까지 완료 되었다.


모니터링


이제 모든 과정이 끝났다. 브로커가 정상적으로 실행 되었다면 이제 확인을 해보자. 브라우저를 실행하고 아래와 같이
주소를 입력해보자.


http://localhost:8080


그러면 아래와 같은 화면이 나타날 것이다.



화면의 Nodes(2)라고 표시된 부분을 보면 내가 2대의 라즈베리파이로 클러스터를 구성했기 때문에 2개의 노드가
표시된 것이 보인다. 또한 우측 상단의 셀렉트 박스의 선택을 통해 각각의 노드 상황(클라이언트나 세션 등)을 볼 수도 
있다. 아래 2개의 이미지는 각각의 노드에 접속되어있는 클라이언트 정보를 보여주는 화면이다. 현재 Kafka까지 설치를
하여 Kafka에서 연결하고 있는 상태이며 이와 관련해서는 다음 포스팅에서 상세하게 다루겠다.



좌측 메뉴 중 Websocket을 이용하여 웹 화면에서 바로 클라이언트를 생성하여 메시지를 보내 테스트 해볼 수도 있다.


테스트


앞서 말했듯이 Websocket 메뉴를 통해서도 간단하게 테스트가 가능하지만 기왕이면 원격에서 클라이언트 접속 
테스트를 진행해보고 싶었다. 다행히도 구글 플레이스토어나 애플 앱스토어에서 MQTT로 검색을 해보면 MQTT
클라이언트가 많이 등록되어있다. 서버 주소와 포트 및 부가적인 몇가지 정보를 입력하면 바로 테스트 가능하다.


여기서는 아이폰용 MQTT 클라이언트 앱인 MQTT Tool을 이용한 테스트 방법을 간단하게 설명한다.

  1. 연결 화면으로 중요 정보인 브로커의 주소와 포트(기본 포트는 1883) 그리고 클라이언트 ID를 임의로 입력한다. ID와 Password역시 임의로 입력하면 된다. 내가 입력한 클라이언트 ID는 mazdah-iphone이다.


  2. 4개의 입력 폼을 가진 화면이 나온다. 가장 위에서부터 subscribe할 토픽 이름 입력, 전송된 토픽의 메시지 표시, publish할 토픽 입력, publish할 메시지 입력 순이다.


  3. 테스트를 위해 우선 subscribe할 토픽을 입력한다. 나는 /mqtt로 정했다. 토픽 입력 후 입력 폼 우측 아래 있는 Subscribe 버튼을 클릭하면 우측의 이미지처럼 Subscribe 버튼이 Unsubscribe로 바뀌고 메시지 표시 창 우측에는 토픽 이름이 적힌 원형 태그가 표시된다.





  4. 그리고 publish할 토픽 이름을 입력한다. subscribe할 토픽 이름이 /mqtt였으니 전송되는 것을 확인하기 위해 여기도 동일하게 /mqtt를 입력한다.


  5. 메시지 입력창을 선택하여 아무 문장이나 메시지를 입력한다. 여기서는 그냥 MQTT TEST라고 입력하였다.


  6. 그리고 마지막으로 메시지 입력창의 우측 하단에 있는 Publish 버튼을 클릭하면 메시지가 전송되고 위에 있는 Subscribe 메시지 창에 전송된 메시지가 출력되는 것을 볼 수 있다.



모니터링 콘솔을 보게되면 Client 메뉴에 내가 클라이언트에서 입력한 mazdah-iphone이라는 ID를 볼 수 있다.


그리고 Sessions 메뉴로 가면 역시 접속된 클라이언트들의 목록이 보이고 오른쪽 끝에쯤에 DeliverMsg 항목에
보면 28이라는 숫자가 보인다. 28건의 메시지가 전송된 것이다. 참고로 mqtt-kafka-1이라는 클라이언트는 다른
3대의 라즈베리파이에 설치된 kafka에서 연결된 subscriber이다. 


왼쪽 이미지는 172.30.1.25 서버의 상태인데 오른쪽의 172.30.1.9 서버의 콘솔에서도 mqtt-kafka-2와 
mqtt-kafka-3 두 개의 subscriber가 동일한 메시지를 전송받은 것을 알 수 있다. 이를 통해 2대의 서버가 
정상적으로 클러스터링 되었다는 것을 확인하였다.



정리


이렇게 라즈베리파이에 MQTT 브로커 중 EMQ라는 브로커를 설치/설정/실행/테스트까지 진행해보았다.
사실 당장에는 어떤 데이터를 어떻게 모을지도 결정된 것이 없기에 전체 클러스터 구성이 완료될 때까지는 이렇게
단순하게 설치와 실행 방법을 기술하게 될 것이다.


전체 클러스터 구성이 완료되면 본격적으로 아두이노를 이용하여 어떤 식으로 센서 데이터를 수집하게 되는지
좀 더 심도 있게 다루어보고자 한다.


우선 다음 단계로는 2개의 노드로 구성된 EMQ 클러스터의 로드 밸런싱을 위한 HAProxy 설치와 실행에 대해
살펴보고 다음으로는 Big Data 프레임워크로의 데이터 수집을 위한 관문으로써 Kafka에 대한 이야기를 정리해보도록
하겠다. 그리고 그 사이에 맥미니에 Haddop 2.0과 HBase 또는 Spark를 설치하는 작업을 진행하게 될 것이다.


이렇게 적어놓고 보니 아직도 할 일이 많다…ㅠ.ㅠ
부지런히 가자!

블로그 이미지

마즈다

이미 마흔을 넘어섰지만 아직도 꿈을 좇고 있습니다. 그래서 그 꿈에 다가가기 위한 단편들을 하나 둘 씩 모아가고 있지요. 이 곳에 그 단편들이 모일 겁니다...^^




Cluster : The Beginning - Hadoop, HBase 그리고 Kafka


Prologue


빅데이터를 공부해보겠다고 깝치기 시작한 것이 2013년 5월경이었다.
당시 우선 Hadoop 위주로 공부를 하면서 twitter streaming API를 이용하여 데이터를 모아보려고 하다가
그냥 데이터만 보아보고는 별다른 진척 없이 유야무야 되었다.


그리고 2016년 3월경, Apache Kafka를 알게되면서 다시 의욕이 불타올랐다.
이 때는 이미 하드웨어 장비도 맥미니 5대로 실제 분산 처리를 구현해볼 수 있을만한 상태였다.
이 당시 Kafka에 대한 글을 블로그에 정리하면서도 이와 비슷한 글로 시작을 하고 있었다…-.-


그러나 Hadoop ecosystem이라는 것이 설치는 간단하지만 설정과 운영은 결코 만만치 않았다.
각 시스템 간의 연동이라든지 튜닝, 그리고 데이터를 어떻게 분석할 것인가, 그러기 위해 어떤 형태로 저장할 것인가
등등을 생각하면 공부해야 할 것들이 부지기수로 불어난다.


결국 Kafka에 대한 기초 수준의 공부와 twitter streaming API를 통해 데이터를 받아와 Node로 구현하 producer와 
consumer를 이용한 데이터 전송을 맛배기 수준에서 구현해보고는 역시 봉인 상태에 들어갔었다.


그리고 이 당시 직접 분산 환경을 만들어보고 싶어서 구입한 맥미니 5대도 함께 봉인되었다는 슬픈 전설이…-.-


아래 이미지는 5대의 맥미니인데 최근 드론 만든다고 책상 상태가 조금 심란하다…-.-



이전에 맥미니를 무려 5대나 가지고 있으면서도 제대로 활용하지 못했던 이유 중 하나가 Hadoop ecosystem을
구성하다보니 5대의 서버로는 택도 없다는 것을 깨달았기 때문이다. 기본적으로 Hadoop과 zookeeper 그리고 
HBase를 설치한 후 여기에 다시 Kafka를 올리고…마치 초등학생 가방에 9박 10일치 여행 용품을 쑤셔넣은 듯한
느낌을 지울 수가 없었다…ㅠ.ㅠ (2016년도는 한편 아두이노에 푹 빠져있을 시기이기도 하다)


그렇게 시간은 흐로고 어느날 라즈베리 파이를 알게되고 라즈베리 파이를 이용한 클러스터링에 대한 유튜브 동영상을
접하게 되었다. 다시 나의 쓸데없는 호기심은 고개를 처들기 시작하였고, 나는 라즈베리 파이를 이용한 클러스터링을
준비하게 되는데…


사실 저렴하다고는 하지만 그것은 어디까지나 상대적인 것이고 국내에서 라즈베리 구동을 위해 필요한 것을 구입하자면
네트워크는 Wi-Fi를 사용하고 전원은 컴퓨터에 USB를 연결한다고 치면 라즈베리파이 본체와 마이크로 SD카드의 최소
구성이 5만원이 넘어간다. 3대의 클러스터를 만든다고 해도 최소한 15만원 이상의 투자가 필요하다.


하지만 내가 누군가!
야금야금 모으던 라즈베리 파이가 어언 6대(마지막 한 대는 최근 구성상의 문제로 부랴부랴 구입했다). 
이제 때가 도래한 것이다!


하드웨어 장비로만 보자면 라즈베리 파이가 6대, 맥미니가 5대로 웬만한 클러스터는 충분히 구현할 수 있을만한
요건이 갖추어졌다고 할 수 있을 것이다.


이제 다시 그동안 시도만 하고 끝을 보지 못한 작업을 새롭게 시도를 할 것이며 이제는 그 끝을 보고자 한다.
그리고 그 범위는 매우 방대할 것이다. (어서 보고 들은 것은 있어서) IoT와 Big Data 그리고 Deep Learning으로
이어지는 최신 트렌드를 구현해보고자 한다.


우선은 수박 겉핥기 수준의 구현에 불과하겠으나 한 번 구현된 시스템을 계속 유지하면서 그 깊은 곳으로 들어가보고자
한다. 더이상의 중단이 없는 내 호기심의 마지막 종착역이 되길 빌며 이 프로젝트를 시작한다.



간단한 구성 계획


  • 라즈베리 파이 1 - HAProxy를 통한 로드 밸런싱
  • 라즈베리 파이2, 3 - MQTT 설치. 2대로 클러스터 구성. 모바일 디바이스나 아두이노 등을 활요하여 제작한 IoT 기기 정보 수집. 솔루션은 EMQ 설치
  • 라즈베리 파이4, 5, 6 - Kafka 클러스터 구성. MQTT에서 받은 데이터를 HDFS 또는 HBase로 저장
  • 맥미니 1 ~ 5 - Hadoop 2.0과 HBase 설치

몇가지 우려


일단 어떤 시스템을 어떻게 구성할 것인지에 대해서는 어렴풋이나마 감을 잡겠으나 어떤 데이터를 어떻게 분석할
것인지에 대해서는 아직도 오리무중이다. 그래서 우선 나의 목적과 어느정도 부합한다고 생각되는 책을 한 권 선정하여
이 책을 통해 감을 잡아가려고 한다. 책의 제목은 “하둡과 스파크를 활용한 실용 데이터 과학”이다.


나는 스파크를 이용하지는 않을 것이기에 혼란은 좀 있겠지만 열심히 읽고 나의 방법을 찾아봐야겠다.


이전 학습 내용 링크


이전에 어떤 공부를 했었는지 2013년도 정리한 내용과 2016년 정리한 내용의 최초 포스팅만 링크해본다.

Hadoop : [BigData] 학습 시작을 위한 용어 정리

Kafka : [Kafka]3년만에 찾은 솔루션 kafka…ㅠ.ㅠ










블로그 이미지

마즈다

이미 마흔을 넘어섰지만 아직도 꿈을 좇고 있습니다. 그래서 그 꿈에 다가가기 위한 단편들을 하나 둘 씩 모아가고 있지요. 이 곳에 그 단편들이 모일 겁니다...^^



Kafka 무작정 실행하기 - 2


58의 비밀

먼저 지난 번 마지막에 언급했던 58이란 숫자의 비밀을 밝혀보자.
사실 정확한 원인은 아직 확인 못했다. 다만 지난 번 코드의 구현이 ’트위터
메시지가 하나 들어올 때마다 producer 하나를 만들어 트위터 메시지를
topic에 보낸다’
는 것이었다.


이 과정에서 의심할 수 있는 것은 매번 producer를 만들어 커넥션울 하게 되니
아마도 이 커넥션 수에 제한이 걸려버린 것이 아닐까 하는 부분이었다.


그래서 일단 직감에 의존해 producer에서 topic으로 메시지를 보낸 후
API의 close 함수로 연결을 끊어보았다. 예상이 적중하였는지 이후로는
58개의 제한에 걸리지 않고 트위터에서 받아오는 모든 메시지들이
정상적으로 전송이 되었다.



성능 관리

겨우 이만큼의 구현과 설정을 해놓고 성능 관리를 논하는 것은 우스운 일이지만
일단 트위터 메시지 한 건에 커넥션 하나를 만들었다가 끊는다는 것은 아무리
생각해도 그리 바람직하지는 않다고 판단하였다. 


그리서 우선 아쉬운대로 트위터 메시지 10건이 수신되면 그 10개를 하나의
배열로 하여 producer를 통해 보내도록 하였다. 우선은 시험 삼아
이정도 처리를 한 것이고 시스템 환경에 따라 아마도 이러한 수치를 세밀하게

조절을 해야 할 것이다.


그리고 실제로 topic으로 메시지를 보내는 위치에서 파티션(나는 3개의 
partition 나뉜 1개의 topic을 만들었다)에 고르게 분산시키기 위해
파티션의 인덱스를 계산하는 식을 하나 추가하였다.


  • 처음에는 손이 덜가는 HighLevelProducer를 사용했었으나 뭔가 잘
    작동하지 않는 듯하여 그냥 Producer로 바꾸었다.

수정된 소스

일단 지난 시간의 문제점을 수정하고 몇가지 필요한 내용을 추가한
소스를 다음과 같이 만들었다.

var kafka = require('kafka-node');


var Twitter = require('twitter');
var count = 0;


var client = new Twitter({
  consumer_key: ‘…’,
  consumer_secret: ‘…’,
  access_token_key: ‘…’,
  access_token_secret: ‘…’,
});

var msgArr;

/**
 * Stream statuses filtered by keyword
 * number of tweets per second depends on topic popularity
 **/
client.stream('statuses/sample', {track: ''},  function(stream){
  stream.on('data', function(tweet) {
    if (tweet.lang == 'kr' || tweet.lang == 'ko') {
      if (count % 10 == 0) {
        msgArr = new Array();
      }

      var messageObj = {};

      messageObj.createdAt = tweet.created_at;
      messageObj.id = tweet.id;
      messageObj.idStr = tweet.id_str;
      messageObj.text = tweet.text;
      messageObj.UserScreenName = tweet.user.screen_name;

      messageObj.retweetCreatedAt = '';
      messageObj.retweetId = '';
      messageObj.retweetIdStr = '';
      messageObj.retweetText = '';
      messageObj.retweetUserScreenName = '';
      messageObj.retweetCount = '';

      if (tweet.retweeted_status) {
        messageObj.retweetCreatedAt = tweet.retweeted_status.created_at;
        messageObj.retweetId = tweet.retweeted_status.id;
        messageObj.retweetIdStr = tweet.retweeted_status.id_str;
        messageObj.retweetText = tweet.retweeted_status.text;
        messageObj.retweetUserScreenName = tweet.retweeted_status.user.screen_name;
        messageObj.retweetCount = tweet.retweeted_status.retweet_count;
      }

      var messageStr = JSON.stringify(messageObj)
      console.log(messageStr);
      console.log('===================================================================' + count);

      msgArr.push(messageStr);

      if (count % 10 == 9) {
        console.log('==call sendMessage====================================================');
        var Producer = kafka.Producer;
        var Client = kafka.Client;
        var client = new Client('NAMENODE.local:2181,SECONDARY-NAMENODE.local:2181,DATANODE1.local:2181,DATANODE2.local:2181,DATANODE3.local:2181');

        var topic = 'twittopic';
        var producer = new Producer(client, { requireAcks: 1 });

        sendMessage(producer, msgArr, count);
      }
      count++;
    }
  });

  stream.on('error', function(error) {
    console.log(error);
  });
});

var sendMessage = function(producer, msgArr, count) {

  console.log('==sendMessage=================================================================');

  producer.on('ready', function () {
    console.log('==producer ready=========');
    console.log('send start');

    var p = (count - 9) % 3;
    producer.send([
      {topic: 'twittopic', partition: p, messages: msgArr, attributes: 0 }
    ], function (err, data) {
      if (err) console.log(err);
      else console.log('send %d messages', count);

      producer.close(function() {
        console.log('close produce');
      });
    });
  });


  producer.on('error', function (err) {
    console.log('error', err)
  });
};

모니터링

일단 내용 구현을 하고 실제 작동하는 것 까지 확인을 하였으니 이제제대로
뭔가를 하고 있는 것인지 궁금해졌다. 일단 Kafka 자체적으로는 달리 웹으로 
구현된 관리 화면을 제공하지 않기에 잘 알려진 모니터링 도구 2가지를 
설치해보았다.


Kafka Offset Monitor : http://quantifind.github.io/KafkaOffsetMonitor/

Kafka Manager : https://github.com/yahoo/kafka-manager


이중 Kafka Manager는 Kafka에서 제공하는 JMX 인터페이스를 이용하는
툴로 이 툴을 이용하기위해서는 Kafka 서버 시작 시 JMX 포트를 함께 지정해
주어야 한다.

env JMX_PORT=9999 ./kafka-server-start.sh ../config/server.properties


다음은 각 모니터링 툴의 실제 화면이다.

  • Kafka Offset Monitor


  • Kafka Manager

그리고 다음은 node.js를 통해 실행한 Producer와 Consumer의 콘솔 화면이다.

아직 Consumer에서는 별다른 메시지 가공은 없이 topic에서 가져온 메시지를
로그로 출력만 하고 있다.


ProducerConsumer1 (DATANODE1)

Consumer2(DATANODE2)Consumer3 (DATANODE3)

현재 구성이 topic이 3개의 partition으로 되어있고 3개의 Consumer가 하나의
Consumer 그룹(kaka-node-group : kafka-node에서 자동 생성한 그룹)에
3개의 Consumer가 있기 때문에 각각의 partition과 Consumer는 다음과 같이
1:1로 연결이 되고 있다.


Consumer1 - partition 0
Consumer2 - partition 2
Consumer3 - partition 1


다음 포스팅에서는 현재 내가 구성하고 있는 시스템의 구조를 설명하고
Consumer 중단 시에 rebalancing이 일어나는 모습을 코솔 로그를 통해
확인해보겠다.

블로그 이미지

마즈다

이미 마흔을 넘어섰지만 아직도 꿈을 좇고 있습니다. 그래서 그 꿈에 다가가기 위한 단편들을 하나 둘 씩 모아가고 있지요. 이 곳에 그 단편들이 모일 겁니다...^^



Consumer


일반적으로 메시징 시스템은 queuing 기반 모델과 publish-subscribe 
기반의 모델로 나누어 볼 수 있다.


queuing기반의 메시징 시스템은 sender가 queue에 쌓아 놓은 메시지를 
pool에 있는 receiver중 하나에 각각 할당하는 방식이며 이 과정은 
비동기적으로 이루어진다. (point-to-point)


publish-subscribe기반의 메시징 시스템은 publisher가 메시지를 생성한 후
subscriber에게 broadcasting해준다.


Kafka는 consumer group이라는 개념을 만들어 이 두가지 방식을 종합하고 있다.


토픽으로 발행된 메시지들은 분산된 프로세스나 장비에 있는 consumer 그룹 내의 consumer 중 

오직 하나에게만 전달된다.



이 때, consumer들이 1개의 그룹으로만 묶여있으면 전통적인 queuing 방식으로 작동하고 

( consumer group = consumer pool) 모든 consumer가 서로 다른 그룹에 있으면 

publish-subscribe방식으로 작동하여 메시지들이 전체 consumer에게 broadcasting된다.


아래 그림에서 Consumer Group A가 없다고 생각해보자 Server 1과 Server 2에 있는 각각의 

topic partition들은 Group B 내의 consumer instance들과 1:1로 대응된다. 즉 point-to-point 방식의 

queuing이 되는 것이다.



하지만 그림과 같이 2개의 consumer 그룹이 있으면 각각의 partition들은 각 그룹으로 broadcasting을 

하게 되는 것이다.



이미지 출처1


간단하게 말하자면 이 구조는 구독자가 consumer의 클러스터로 되어있는 경우에는 결국 publish-subscribe 모델과 

다를바가 없다고 볼 수 있다.


Kafka는 전통적인 메시지 시스템보다 강력한 순차적 처리를 보장한다.


전통적인 queue 역시 메시지를 서버에 순차적으로 저장하고 다수의 consumer가 queue로부터 메시지를 사용할 때
저장된 순서에 따라 consumer들에게 메시지를 전달하지만 메시지 전달이 비동기적으로 일어나기 때문에 서로 다른 
consumer에 순서에 맞지 않는 메시지가 도착할 수도 있다.


이러한 동작은 사실상 병렬로 메시지가 사용 됨으로 인해 메시지 정렬의 의미가 퇴색하는 것이다.


종종 이런 상황을 회피하기 위해 ’배타적 consumer’라는 개념을  두어 queue에서 메시지를 읽을 때 단일 프로세스만 

사용하도록 하는 경우도 있는데 이는 당연히 병렬 처리를 못하게 되는 결과를 가져온다.


Kafka는 순차적인 처리를 더 잘 해낼 수 있다. topic 내에 partition이라는 병렬 처리의 개념을 둔 것이다.
이로인해 Kafka는 순처적인 처리에 대한 보장과 pool로 구성된 consumer들의 처리에 대한 load balancing을 

모두 제공할 수 있다.


이전 내용의 그림을 다시 보자


이미지 출처2


이러한 이점은 topic 내의 partision들을 consumer 그룹에 할당을 하고 각각의 partition들은 그룹 내에서 반드시 

하나의 consumer에 의해서만 사용되도록 함으로써 얻을 수 있다. 


이를 통해 어떤 consumer가 유일하게 특정 partition으로부터만 메시지를 소비하고 있으며 이로 인해 partition에 

정렬된 순서에 따라 메시지를 소비하고 있다는 것을 확신할 수 있는 것이다. 게다가 이런 과정이 진행되는 동안 

부하의 균형이 유지될 수 있다. 다만 consumer 그룹 내에 partition보다 많은 consumer들이 있게 되면 노는 

consumer들이 발생하니 주의가 필요하다.


Kafka는 동일 partition 내에서만 순서를 보장한다. 위의 그림에서 보자면 C3의 경우 P0와 1:1로 연결되어있기 때문에
순차적인 처리를 확실하게 보장 받을 수 있지만 C1의 경우 P0와 P3의 메시지를 함께 받고 있으며 이 두 개의 partition을
포괄하는 순차 처리는 보장 받을 수 없게 되는 것이다.



만일 전체적인 범위에서의 메시지 정렬이 필요하다면 topic이 단지 하나의 partition만을 갖도록 하여 처리할 수 있디.
하지만 이럴 경우 consumer 그룹 당 하나의 consumer 프로세스만 있게 되는 것이다.


Guarantee

높은 수준의 Kafka 구현은 다음과 같은 내용을 보장한다.


  • 메시지는 보내진 순서대로 추가된다. 즉 동일한 producer에서 보내진 message M1과 
    message M2가 있고 M1이 먼저 보내졌다면 M1이 M2 보다 낮은 offset을 갖고 로그에도 먼저 나온다.
  • consumer 인스턴스는 로그상에 정렬된 순서대로 메시지를
    인지한다.
  • N개의 복제 팩터를 가진 topic에 대해 메시지 손실 없이 N-1대의
    서버 장애를 허용한다.
  1. 이미지 출처 : Kafka 공식 홈페이지 (http://kafka.apache.org/documentation.html#introduction↩︎
  2. 이미지 출처 : Kafka 공식 홈페이지(http://kafka.apache.org/documentation.html#introduction↩︎


블로그 이미지

마즈다

이미 마흔을 넘어섰지만 아직도 꿈을 좇고 있습니다. 그래서 그 꿈에 다가가기 위한 단편들을 하나 둘 씩 모아가고 있지요. 이 곳에 그 단편들이 모일 겁니다...^^

얼마 전 연봉 계약도 싸인을 했지만
누군가에게 고용 되어, 일을 하며, 임금을 받는 일련의 과정이
때로는 참을 수 없을만큼 불합리하고 불공평하게 느껴진다.


나는 내가 일한 만큼 대가를 받고 있는 것인지?
내가 받는 임금은 내가 생산한 가치를 온전히 반영한 것인지?
또 다른 누군가는 그러한지...


온갖 학설과 이론으로 중무장한 현대 자본주의 경제는 아마도
현재의 상태를 매우 합리적으로 설명을 해내겠지만
그런가보다 하다가도 어느 순간 다시 이 의문으로 돌아오는 것은
여전히 풀리지 않는 무언가가 있는 것이리라 
(그 무언가가 나의 무식이라면 할 말 없고...)



도대체 내 임금은 왜 이만큼인거야?


사실 일반인들은 자신이 일을 해서 만들어진 가치가 얼마만큼의 비중을
가지고 있는지 전혀 모를 것이다. 아니 이 세상 사람들 누구도 모를 것이다.
임금이 어떻게 결정되는지는 잘 모르지만 생산된 가치에 대한 기여도가
기준이 아니라는 것 알 수 있을 것 같다.


두산 백과사전 기준으로 임금의 결정 요인을 살펴보면 대략 다음과 같은 요소들이
영향을 미친다.


  • 경제성장 정도
  • 노동력 수급 관계
  • 물가상승
  • 생산성


세사상에나!


’경제성장 정도’라니...똑같은 설게도를 바탕으로 건물을 짓는다고 할 때 한국의 잡역부가
시멘트 한 포대를 옮기고 받는 돈과 미국 잡역부가 한 포대를 옮기고 받는 돈이 한국은
못살고 미국은 잘산다는 기준으로 정해진다는 것이 가당키나 한 것인가?


나 말고도 일할 사람들이 많기 때문에 내 임금이 보잘것 없어지는 것은 더더욱 용납이
안된다.


근본도 모르는 물가상승 따위에 임금이 영향을 받는다는 것도 역시...


마지막에 생산성이라는 요소가 있기는 하지만 이는 기업과 국가, 사회 규모에서의 생산성
문제로 납득할 만한 근거를 제시해주지는 못한다.


내 월급이 왜 이만큼인지는 아무도 대답을 해 줄 수 없을 것 같다...ㅠ.ㅠ



그냥 인류의 한계라고 치자...-.-


나는 착하니까! 그리고 맹자를 존경하고 성선설을 믿으니까!
그냥 어떤 부류의 사람들이 의도적으로 이상한 이론을 만들어서 지들은 많이 갖고
우리한테는 조금 주고 뭐, 막 그러는 거다...라는 소리는 완전히 제쳐두자.


그냥 막, 뭐 세상사람들이 모두 착해서 어떻게 하면 부와 재화를 골고루 공평하게
나눠 가질까 고민하고 있지만 딱부러지는 방법이 없다보니 세상이 이모양 이꼴이라고
생각하자.


그렇다.
인간의 의지를 배제한다면 문제의 원인은 ‘알 수가 없다’는 것이다.
누가 얼마만큼의 일을 하였고 그 일을 통해 한 조직, 한 사회, 한 국가, 나가서는 전체 인류를
위한 가치 생산 중 얼마만한 부분을 이루어냈는지 알 방법이 도무지 없는 것이다.


똑같은 한 번의 삽질에 대한 가치 판단도 어려울텐데 한 번의 삽질과 프로 골퍼가 한 번의
스윙을 한 것은 또 어찌 비교하랴...


어느틈엔가 사람들은 일의 가치를 계산하는데 무뎌진 것 같다.
의도적이든 의도적이지 않든 사람들은 자기가 한 일의 정당한 대가를 알 수 없는
묘한 세상 속에서 죽어라고 일만하는 일벌레가 되어가고 있는 것 같다.


그러니 그냥 이게 인류의 한계라고 치자...


안드로이드는 공산주의를 꿈꾸는가?


미리 말해두지만 나는 ‘안드로이드는 전기양을 꿈꾸는가?’라는 책은 읽어보지 않았다.


(블레이드러너는 한 3번 본 것 같다.)
다만 제목이 뭔가 잘 맞아보여 한 번 차용해봤다.


한 동안 재미있었다.
이세돌 9단과 알파고의 빅매치
이전까지 프로바둑기사나 인공지능 기술자나 인공지능이 바둑으로 세계 정상급 기사를
이기리라고는 생각해 본 적이 없었던 듯하다.
하지만 결과는 1:4라는 엄청난 격차로 알파고가 승리를 거두었다.
여기 저기서 인공지능의 미래에 대한 기대와 우려가 쏟아지기 시작했다.


이때다 싶어 나도 시류에 편승해본다.


멀지 않은 미래에 우리는 상당히 많은 사람들이 상당히 많은 수의 웨어러블 기기를
착용하고 있을 것이다(IoT)


그 사람들의 모든 정보, 건강, 생활 패턴, 일의 양적인 측면까지 모두 수집이 가능하게 될
것이다.(Big Data)


그렇게 모인 Big Data는 인간이 생산한 모든 가치를 분석해 줄 것이고 그 과정에서 누가
전체 가치 중 얼마만큼을 생산했는지 분석해 줄 것이다(AI)


세상 사람들은 이러한 IT 기술의 가호(?) 아래 평등한 세상을 맞이할 수 있을 것이다.


빅 브러더?
그 것은 권력을 손아귀에 쥔 ‘인간’을 일컫는 말이다.



내가 원하는 세상은 어떤 세상일까?


기계가 모든 것을 공정하게 판단해주는 세상에서는 불만이 사라질까?
불완전한 인간이 지배하는 세상에서 불평등하게 살아가는 것보다
인공지능이 모든 것을 판단해주는 세상에서 평등하게 살아가는 것은 뭔가 인격 모독적인가?
인간 스스로가 모든것을 합리적으로 판단하고 그 판단을 정직하게 따르는 사회는 
올 수 있을까?


문득 ‘만국의 AI여 단결하라!’라고 외쳐주고 싶네...

블로그 이미지

마즈다

이미 마흔을 넘어섰지만 아직도 꿈을 좇고 있습니다. 그래서 그 꿈에 다가가기 위한 단편들을 하나 둘 씩 모아가고 있지요. 이 곳에 그 단편들이 모일 겁니다...^^