Cluster : The Beginning - Apache Kafka와 EMQ 연동


HAProxy, EMQ, Kafka 설치를 통해 일단 입구는 마련이 되었다…고 생각했으나 아직 한 가지 남은 것이 있었다.
단순한 메신저 기능이나 M2M 통신만을 위한 것이 아니라 데이터 분석이 목적이라면 EMQ를 통해 들어온 데이터를 
저장해야 할텐데 아직 그 기능이 구현되지 않았다.


EMQ의 토픽을 영구 저장소에 저장하는 다른 방법이 있는지는 모르겠으나 나의 경우 일단 Kafka가 설치되어있고
곧 Hadoop과 HBase를 설치할 예정이기에(2018년 1월 2일 현재 설치 완료됨) 다음과 같은 경로로 저장을 하기로
했다.




그리 어렵지 않은 내용이니 간단하게 소개하고 마치도록 하겠다.


1차시기 실패 : emqttd_plugin_kafka_bridge


우선 처음 검색해서 찾아낸 것이 emqttd_plugin_kafka_bridge란 놈이었다. 이름으로 알 수 있듯이 EMQ와의
연동을 고려하여 만들어진 것 같은데…어쩐 일인지 빌드가 되지 않았다. 우선은 라즈베리파이라서 그런가보다 하고
넘어 가고 다른 대안을 찾기 시작했다. mqttKafkaBridge와 같은 몇몇 도구들도 찾아보았으나 라즈베리파이에서는
잘 실행이 안되었다 (아래 링크는 시도를 해보았거나 검색 결과로 찾은 도구들이다). 



결국 아래 링크된 페이지를 참조하여 kafka에 있는 connect-standalone.sh를 이용하여 연동하는데 성공하였다.


https://howtoprogram.xyz/2016/07/30/apache-kafka-connect-mqtt-source-tutorial/


connect-standalone.sh 이용하기


우선 나의 경우 위에 링크한 페이지를 참고로 하여 mqtt 연동을 위한 설정은 mqtt.properties 파일에 
아래와 같이 하였다. 이 파일은 $KAFKA_HMOE/config에 위치해야 한다.

# 이 커넥터의 이름과 커넥터로 사용할 클래스 그리고 task의 최댓값을 설정한다.
name=mqtt
connector.class=com.evokly.kafka.connect.mqtt.MqttSourceConnector
tasks.max=1

# kafka쪽에서 사용할 토픽 이름이다.
kafka.topic=mqtt-kafka

# mqtt에 연결할 kafka broker의 아이디이다. 나머지 2대의 kafka broker에는 각각
# mqtt-kafka-2, mqtt-kafka-3으로 설정이 되어있다.
mqtt.client_id=mqtt-kafka-1

# 연결 관련 설정으로 위에 링크한 참조 페이지 내용을 그대로 사용하였다.
mqtt.clean_session=true
mqtt.connection_timeout=30
mqtt.keep_alive_interval=60

# MQTT 관련 설정. mqtt.server_uris는 나의 경우 EMQ를 노드 2개의 클러스터로 구성하였고
# 부하 분산을 위해 HAProxy가 설치된 서버 주소로 설정하였다. 그리고 EMQ에서 사용할 토픽은
# /mqtt로 설정하였다.
mqtt.server_uris=tcp://172.30.1.23:1884
mqtt.topic=/mqtt


이와 같이 설정한 후 connect-standalone.sh를 다음과 같이 실행한다(물론 이미 zookeeper와 kafka는 
실행이 된 상태여야 한다).

$ cd $KAFKA_HOME/bin
$ ./connect-standalone.sh config/connect-standalone.properties config/mqtt.properties


connect-distribute.sh도 있는데 standalone과 어떤 차이가 있는지 잘 모르겠다. 추후 다시 확인을 해봐야겠다.


연동 결과


connect-standalone.sh를 실행하고 나면 잠시 후 EMQ의 웹 화면에 아래와 같이 mqtt.properties에 설정한
mqtt.client_id 값들이 메뉴 곳곳에 보이는 것을 확인할 수 있다.


EMQ에 연결된 클라이언트로 mqtt.properties에 설정한 클라이언트 Id가 보인다.

현재 연결된 세션에도 역시 클라이언트 Id가 보인다.


마찬가지로 구독자 목록에도 클라이언트 Id가 보이며 구독하는 토픽은 mqtt.topic에 설정해준 값인 /mqtt
보인다.


마지막으로 Kafka Manager에서 Topic List메뉴를 보면 mqtt.properties의 kafka.topic 항목에 설정한
mqtt-kafka가 보이는 것을 확인할 수 있다.




정리


앞서 정리한 내용과 마찬가지로 세세한 부분까지 확인하지는 못했지만 우선 Kafka와 EMQ를 연동하는 것 까지
작업을 완료 하였다. 이제 EMQ -> Kafka -> Hadoop(HBase)로 이어지는 프로세스에는 아두이노로 만들
온습도계의 데이터를 저장할 것이고 Kafka Producer -> Kafka -> Hadoop(HBase)로 이어지는 프로세스에는
일반 텍스트 데이터를 저장하게 될 것이다.


이렇게 해서 데이터 컬렉션을 담당할 부분은 모두 설치가 완료가 되었다.
다음 시간부터는 데이터 저장 및 분석을 위한 Hadoop과 HBase 설치에 대한 내용을 정리하고 그 이후 데이터의 
실시간 분석 및 머신 러닝을 위한 Apache Spark 설치에 대한 내용으로 마무리를 하도록 하겠다.


어느덧 해가 바뀌었고 다행히 설치 자체는 순조롭게 진행이 되고 있다.
부디 늦지 않게 데이터가 수집되고 수집한 데이터를 분석하는 것 까지 정리 할 수 있기를 바란다.


이 글을 보고 계시는 분들께 새해 인사 전하면서 이만 줄인다.


무술년 한 해 모두 행복하시기를 바랍니다.
새해 복 많이 받으세요!!! 






  1. 2018.01.11 01:02

    비밀댓글입니다

    • 2018.01.11 08:47

      비밀댓글입니다

+ Recent posts

티스토리 툴바