Apache Kafka 시작하기


참조 사이트

http://kafka.apache.org
http://epicdevs.com/20

다행이 예전에 Hbase를 설치할 때 zookeeper를 설치해놓은 덕에 kafka를 설치하고 구동하는 과정은 

그리 어렵지 않았다.아다시피 요즘 OS가 Windows만 아니면 binary 패키지를 다운로드 받고 적절한 위치에 

압축 풀고 하면반은 된 것이나 다름 없다.


kafka의 경우 $KAFKA_HOME/config/server.properties에 몇가지 설정만 한 후 기동시키면 된다.
다음은 내 PC 환경에서의 설정 및 구동 과정이다.


PC 환경

현재 맥미니 5대를 내부망으로 연결시킨 상태로 각각의 PC에 대한 정보는
/etc/hosts 파일에 설정이 되어있다. 대략 다음과 같다.

172.30.1.33     NAMENODE.local
172.30.1.56     SECONDARY-NAMENODE.local
172.30.1.59     DATANODE1.local
172.30.1.39     DATANODE2.local
172.30.1.11     DATANODE3.local

때문에 대부분의 설정에서는 host name인 NAMENODE.local 등을 이용한다. 이후
설정 사항에 이점 참고하길 바란다.


zookeeper 설정

zookeeper는 5대의 노드 모두 설치가 되어있으며 대략 다음과 같이 
설정이 되어있다. 설정 파일은 $ZOOKEEPER_HOME/cont/zoo.cfg이다. 
물론 5대의 노드 모두 동일한 설정이다.

dataDir=/zookeeper
clientPort=2181

#Zookeeper Servers
server.0=NAMENODE.local:2888:3888
server.1=SECONDARY-NAMENODE.local:2888:3888
server.2=DATANODE1.local:2888:3888
server.3=DATANODE2.local:2888:3888
server.4=DATANODE3.local:2888:3888

zookeeper의 실행 명령은 다음과 같다. 
(zookeeper 설치 위치는 zookeeper-3.4.5다

NAMENODE1:bin mazdah$ /zookeeper-3.4.5/bin/zkServer.sh start

kafka 설치, 설정, 구동

  • 설정

앞서 말했듯이 다운로드 압축해제 적절한 위치 복사면 모든 설치는 끝이다. 내가 받은 버전은 

kafka_2.11-0.9.0.1이며 /kafka에 설치했다.


우선 설정파일을 열고 다음과 같이 설정을 하였다. 설정 파일은 /kafka/config/server.properties다.
가장 먼저 설정해주어야 할 부분은 broker.id이다. 

나는 총 5대의 노드가 있기 때문에 각각의 PC(node)에 0부터 4까지 아이디를 부여해주었다.
예를들어 NAMENODE.local의 설정은 다음과 같다.

broker.id=0

다음으로는 zookeeper에 대한 설정으로 zookeeper.connect라는 항목에 
앞서말한 5개의 노드를 모두 적어주었다.

zookeeper.connect=NAMENODE.local:2181,SECONDARY-NAMENODE.local:2181,DATANODE1.local:2181,DATANODE2.local:2181,DATANODE3.local:2181

나는 처음에 이 부분에 적는 2181 포트가 도대체 어떤 포트인가 고민하다가 나중에 zookeeper의 cleintPort 

설정을 보고는 이해했다…-.- 그밖의 나머지 설정은 그냥 default로 두었다.


나중에 추가한 설정이 있는데 이 것은 나중에 상황을 설명하면서 함께 이야기 하겠다.

  • kafka 서버 기동

이제 kafka 서버를 기동해보자. 각각의 노드에서 /kafka/bin으로 이동한 후 다음과 같이 입력하여 기동한다.

NAMENODE:bin mazdah$ ./kafka-server-start.sh ../config/server.properties

아래와 같이 kafka 설정 정보들이 죽 나오고 이후 서버 start 메시지가 출력되면서 기동된다.

[2016-03-20 11:38:57,142] INFO KafkaConfig values: 
advertised.host.name = null
metric.reporters = []
quota.producer.default = 9223372036854775807
offsets.topic.num.partitions = 50
.
.
.

이후 테스트는 http://epicdevs.com/20의 내용을 그대로 진행해보았다.
물론 서버 정보 등은 나의 환경에 맞춰서..

  • topic 생성

SECONDARY-NAMENODE:bin mazdah$ ./kafka-topics.sh --create --zookeeper SECONDARY-NAMENODE.local:2181 --replication-factor 5 --partition 20 --topic test

그런데 처음부터 문제가 생겼다. 위와같이 시작을 했는데 아래와 같은 오류가 발생을 했다.

Exception in thread "main" org.I0Itec.zkclient.exception.ZkTimeoutException: Unable to connect to zookeeper server within timeout: 30000
at org.I0Itec.zkclient.ZkClient.connect(ZkClient.java:1223)
at org.I0Itec.zkclient.ZkClient.<init>(ZkClient.java:155)
at org.I0Itec.zkclient.ZkClient.<init>(ZkClient.java:129)
at kafka.utils.ZkUtils$.createZkClientAndConnection(ZkUtils.scala:89)
at kafka.utils.ZkUtils$.apply(ZkUtils.scala:71)
at kafka.admin.TopicCommand$.main(TopicCommand.scala:53)
at kafka.admin.TopicCommand.main(TopicCommand.scala)

ps 명령어로 각 노드에서 확인을 해보아도 모든 zookeeper 인스턴스들이 정상적으로 떠있는 상태였는데 

zookeeper 연결시 타임아웃으로 연결할 수 없다니…ㅠ.ㅠ


일단 퍼뜩 떠오르는 생각은 zookeeper의 leader가 NAMENODE.local에 설정되어 있다는 것이다. 

아마도 topic 생성 등의 중요 작업은 zookeeper leader 노드에서 해야 하는 것 같다. 

다음과 같이 NAMENODE.local에서 topic이 생성되도록 하니 정상적으로 topic이 생성되었다.
(실제로 zookeeper의 leader node에서 실행하지 않아도 —zookeeper 옵션만leader node로 해주어도 된다). 
다음과 같이 성공하였다.

SECONDARY-NAMENODE:bin mazdah$ ./kafka-topics.sh --create --zookeeper NAMENODE.local:2181 --replication-factor 5 --partition 20 --topic test

Created topic "test".
  • topic 리스트 보기

자 그러면 이제 topic 리스트를 한 번 보도록 하자.

SECONDARY-NAMENODE:bin mazdah$ ./kafka-topics.sh --list --zookeeper NAMENODE.local:2181

test

아직은 만들어진 topic이 test 하나밖에 없기 때문에 test만 출력된다.

  • topic 상세 보기

그리고 다음과 같이 test topic의 상세 내용을 확인해보자.

SECONDARY-NAMENODE:bin mazdah$ ./kafka-topics.sh --describe --zookeeper NAMENODE.local:2181 --topic test

Topic:test	PartitionCount:20	ReplicationFactor:5	Configs:
Topic: test	Partition: 0	Leader: 0	Replicas: 0,1,2,3,4	Isr: 0,1,2,3,4
Topic: test	Partition: 1	Leader: 1	Replicas: 1,2,3,4,0	Isr: 1,2,3,4,0
Topic: test	Partition: 2	Leader: 2	Replicas: 2,3,4,0,1	Isr: 2,3,4,0,1
Topic: test	Partition: 3	Leader: 3	Replicas: 3,4,0,1,2	Isr: 3,4,0,1,2
Topic: test	Partition: 4	Leader: 4	Replicas: 4,0,1,2,3	Isr: 4,0,1,2,3
Topic: test	Partition: 5	Leader: 0	Replicas: 0,2,3,4,1	Isr: 0,2,3,4,1
Topic: test	Partition: 6	Leader: 1	Replicas: 1,3,4,0,2	Isr: 1,3,4,0,2
Topic: test	Partition: 7	Leader: 2	Replicas: 2,4,0,1,3	Isr: 2,4,0,1,3
Topic: test	Partition: 8	Leader: 3	Replicas: 3,0,1,2,4	Isr: 3,0,1,2,4
Topic: test	Partition: 9	Leader: 4	Replicas: 4,1,2,3,0	Isr: 4,1,2,3,0
Topic: test	Partition: 10	Leader: 0	Replicas: 0,3,4,1,2	Isr: 0,3,4,1,2
Topic: test	Partition: 11	Leader: 1	Replicas: 1,4,0,2,3	Isr: 1,4,0,2,3
Topic: test	Partition: 12	Leader: 2	Replicas: 2,0,1,3,4	Isr: 2,0,1,3,4
Topic: test	Partition: 13	Leader: 3	Replicas: 3,1,2,4,0	Isr: 3,1,2,4,0
Topic: test	Partition: 14	Leader: 4	Replicas: 4,2,3,0,1	Isr: 4,2,3,0,1
Topic: test	Partition: 15	Leader: 0	Replicas: 0,4,1,2,3	Isr: 0,4,1,2,3
Topic: test	Partition: 16	Leader: 1	Replicas: 1,0,2,3,4	Isr: 1,0,2,3,4
Topic: test	Partition: 17	Leader: 2	Replicas: 2,1,3,4,0	Isr: 2,1,3,4,0
Topic: test	Partition: 18	Leader: 3	Replicas: 3,2,4,0,1	Isr: 3,2,4,0,1
Topic: test	Partition: 19	Leader: 4	Replicas: 4,3,0,1,2	Isr: 4,3,0,1,2

topic 생성시 partition을 20으로 지정했기 때문에 Partition: 0부터  Partition: 19까지 20개의 목록이 보이게 된다. 

각 항목에 대해서는 추후에 설명하기로 하고 대략 이런 모습이란 것만 확인하고 넘어가자.

  • producer를 이용한 메시지 생성

다음과 같이 입력하여 message룰 생성해보자.
이 명령에서 보여지는 9092포트는 kafka 설정 파일인 server.properties에 “listeners=PLAINTEXT://:9092”로 

설정이 되어있다. 실행하면 빈 프롬프트가 생기고 적절한 메시지를 입력하면 된다. 


입력한 후 엔터를 치면 다음 메시지를 입력할 수 있다. message1에서 message6까지 메시지를 생성해보았다.

SECONDARY-NAMENODE:bin mazdah$ ./kafka-console-producer.sh --broker-list NAMENODE.local:9092 --topic test

message1
message2
message3
message4
message5
message6
  • cunsumer를 이용한 message 확인

다음과 같이 consumer를 실행하여 producer에서 생성한 메시지를 확인한다. 입력한 순서대로 출력되지 않는 것이 

아마도 Map 형식의 처리인 것으로 보인다. (가운데 빈 라인은 message 입력 제일 마지막에 엔터를 한 번 입력해서 

그렇다)

SECONDARY-NAMENODE:bin mazdah$ ./kafka-console-consumer.sh --zookeeper NAMENODE.local:2181 --topic test -rom-beginning

message5
message3

message1
message2
message4
message6
  • topic의 삭제

그냥 테스트로 만든 topic이니 미련 없이 삭제해보자. test이후에 test2라는 topic을 하나 더 만든 상태이다.
topic의 삭제는 생성과 동일한 구문으로 수행을 하며 —create 대신 —delete를 사용하면 된다.

SECONDARY-NAMENODE:bin mazdah$ ./kafka-topics.sh --delete --zookeeper NAMENODE.local:2181 --replication-factor 5 --partition 20 --topic test2

Topic test2 is marked for deletion.
Note: This will have no impact if delete.topic.enable is not set to true.

그런데 여기서도 이상한 부분이 발견되었다. 마지막 줄에 보면 다음과 같은 내용이 있다.
Note: This will have no impact if delete.topic.enable is not set to true.

혹시나 해서 topic 리스트를 확인해보았다.

SECONDARY-NAMENODE:bin mazdah$ ./kafka-topics.sh --list --zookeeper NAMENODE.local:2181

test
test2 - marked for deletion

역시나 삭제한 test2 토픽에 marked for deletion라는 내용이 붙어있다. 어찌된 일일까? 

짐작했겠지만 답은 삭제시 보여진 NOTE의 내용에 있다. 설정 파일에 delete.topic.enable항목을 추가하고 

값을 true로 설정하면 된다.

delete.topic.enable=true

이후 다시 목록을 확인해보면 다음과 같이 말끔히 지워진 것을 확인할 수 있다.

SECONDARY-NAMENODE:bin mazdah$ ./kafka-topics.sh --list --zookeeper NAMENODE.local:2181

test

일단 오늘은 여기까지 기본적인 내용을 확인할 수 있었다.
다음 주에는 본격적으로 kafka에 대해 하나 하나 정리를 해보아야겠다.

블로그 이미지

마즈다

이제 반백이 되었지만 아직도 꿈을 좇고 있습니다. 그래서 그 꿈에 다가가기 위한 단편들을 하나 둘 씩 모아가고 있지요. 이 곳에 그 단편들이 모일 겁니다...^^

댓글을 달아 주세요


이제야 발견한 Kafka


하던 일도 제대로 못하면서 빅데이터 공부해보겠다고 꼴깝을 떤 것이 
벌써 2013년 5월달 이야기네…


뭔가 새로운 것에 대해서는 남들 앞에서 한 마디나마 
거들 수 있어야 하지 않을까 하는 밑도 끝도 없는 초조감이
나를 뻘짓거리의 함정으로 이끌었다.


뭔가를 시작하기 전에는 지름신을 영접하는 것이 당연한(?) 의례인지라
이 때도 지름신을 조금 과하게(…ㅠ.ㅠ) 영접했다.


맥미니 5대…


그리고는 한 동안은 신났다.
Hadoop 설치하고 zookeeper 설치하고 Hbase 설치하고…
그리고…샘플 한 번 돌려보고? 끝이었나?…ㅠ.ㅠ


목표로 삼았던 것이 twitter의 데이터를 수집해서 이것 저것 분석하는
공부를 좀 해보고자 했는데…
이게 당최 감이 안잡히는 것이다.


twitter API를 통해 데이터를 가져오는 것이야 알겠는데…
이걸 어떻게 저장을 해야하는지…


처음에는 그냥 REST API를 이용해서 데이터를 가져오고
무작정 일반 txt 파일로 저장을 했다.
그러다가 Streaming API를 이용하게 되었는데 아무리 샘플 데이터라
하더라도 연속적으로 들어오는 데이터를 어떻게 저장을 해야 할지,
그리고 간단하게나마 가공을 하고자 하는데 가공을 하는 중 계속해서
들어오는 데이터는 어떻게 처리를 해야 하는지에 대한 해결책을 
전혀 몰랐다.



그렇게 3년이 지나버린 것이다.
그리고 우연히 kafka를 발견한 것이다.
프란츠 카프카도 아니고 해변의 카프카도 아니고 apache Kafka!


일단 2가지 측면에서 관심을 가지게 되었는데 하나는 Use Case 중 
Stream Processing이고 다른 하나는 Client에 node.js가 있다는 점이었다.


처음부터 twitter API를 이용하는데 node module을 사용한지라 계속해서
node module을 사용할 수있다는 것은 나에게는 큰 매리트였다.



아직은 대충 훑어본 정도라 과연 나의 목적에 딱 맞는지는 잘 모르겠으나
이 부분은 계속 적용을 해 나가면서 구체적으로 정리를 해야겠다.


당분간 주말에 할 것이 생겨서 즐겁네

블로그 이미지

마즈다

이제 반백이 되었지만 아직도 꿈을 좇고 있습니다. 그래서 그 꿈에 다가가기 위한 단편들을 하나 둘 씩 모아가고 있지요. 이 곳에 그 단편들이 모일 겁니다...^^

댓글을 달아 주세요

최초 작성일 : 2012/01/04 19:05 


자세한 설명을 기대하신 분들껜 죄송...-.-


일단 Lion 서버에는 기본적으로 아파치와 svn이 설치되어있으므로
아파치와 svn 연동은 기존 리눅스에서 하던 것과 동일하게 처리함.

다음은 trac 설치 및 아파치, svn과의 연동이었는데 이 것은 맥에서 설치한
케이스를 검색하기가 쉽지 않고 몇군데 찾아본 곳에서 적힌대로 시도한 것은
모두 실패함

구글에서 검색한 결과 중 가장 깔끔한게 정리된 곳이 아래 링크


한방에 깔끔하게 trac을 설치하고 아파치 연동까지 완료.

마지막으로 trac과 svn 연동은 단지 trac.ini에서 repository 경로를 적어주고
몇가지 중요해보이지 않는 설정을 해주면 끝~

대략 3일간의 삽질 끝에 아파치 + svn + trac 설치를 완료하였다.
SSL까지 적용하려고 했으나 내부망에서 혼자 쓰는 것이기에 깔끔하게 생략~^^;;;

아름다운 나의 trac...@.@




블로그 이미지

마즈다

이제 반백이 되었지만 아직도 꿈을 좇고 있습니다. 그래서 그 꿈에 다가가기 위한 단편들을 하나 둘 씩 모아가고 있지요. 이 곳에 그 단편들이 모일 겁니다...^^

댓글을 달아 주세요