본문 바로가기
  • SDXL 1.0 + 한복 LoRA
  • SDXL 1.0 + 한복 LoRA
Study/빅데이터

[간보기 | kafka] Apache kafka 시작하기

by 마즈다 2016. 3. 20.
반응형



Apache Kafka 시작하기


참조 사이트

http://kafka.apache.org
http://epicdevs.com/20

다행이 예전에 Hbase를 설치할 때 zookeeper를 설치해놓은 덕에 kafka를 설치하고 구동하는 과정은 

그리 어렵지 않았다.아다시피 요즘 OS가 Windows만 아니면 binary 패키지를 다운로드 받고 적절한 위치에 

압축 풀고 하면반은 된 것이나 다름 없다.


kafka의 경우 $KAFKA_HOME/config/server.properties에 몇가지 설정만 한 후 기동시키면 된다.
다음은 내 PC 환경에서의 설정 및 구동 과정이다.


PC 환경

현재 맥미니 5대를 내부망으로 연결시킨 상태로 각각의 PC에 대한 정보는
/etc/hosts 파일에 설정이 되어있다. 대략 다음과 같다.

172.30.1.33     NAMENODE.local
172.30.1.56     SECONDARY-NAMENODE.local
172.30.1.59     DATANODE1.local
172.30.1.39     DATANODE2.local
172.30.1.11     DATANODE3.local

때문에 대부분의 설정에서는 host name인 NAMENODE.local 등을 이용한다. 이후
설정 사항에 이점 참고하길 바란다.


zookeeper 설정

zookeeper는 5대의 노드 모두 설치가 되어있으며 대략 다음과 같이 
설정이 되어있다. 설정 파일은 $ZOOKEEPER_HOME/cont/zoo.cfg이다. 
물론 5대의 노드 모두 동일한 설정이다.

dataDir=/zookeeper
clientPort=2181

#Zookeeper Servers
server.0=NAMENODE.local:2888:3888
server.1=SECONDARY-NAMENODE.local:2888:3888
server.2=DATANODE1.local:2888:3888
server.3=DATANODE2.local:2888:3888
server.4=DATANODE3.local:2888:3888

zookeeper의 실행 명령은 다음과 같다. 
(zookeeper 설치 위치는 zookeeper-3.4.5다

NAMENODE1:bin mazdah$ /zookeeper-3.4.5/bin/zkServer.sh start

kafka 설치, 설정, 구동

  • 설정

앞서 말했듯이 다운로드 압축해제 적절한 위치 복사면 모든 설치는 끝이다. 내가 받은 버전은 

kafka_2.11-0.9.0.1이며 /kafka에 설치했다.


우선 설정파일을 열고 다음과 같이 설정을 하였다. 설정 파일은 /kafka/config/server.properties다.
가장 먼저 설정해주어야 할 부분은 broker.id이다. 

나는 총 5대의 노드가 있기 때문에 각각의 PC(node)에 0부터 4까지 아이디를 부여해주었다.
예를들어 NAMENODE.local의 설정은 다음과 같다.

broker.id=0

다음으로는 zookeeper에 대한 설정으로 zookeeper.connect라는 항목에 
앞서말한 5개의 노드를 모두 적어주었다.

zookeeper.connect=NAMENODE.local:2181,SECONDARY-NAMENODE.local:2181,DATANODE1.local:2181,DATANODE2.local:2181,DATANODE3.local:2181

나는 처음에 이 부분에 적는 2181 포트가 도대체 어떤 포트인가 고민하다가 나중에 zookeeper의 cleintPort 

설정을 보고는 이해했다…-.- 그밖의 나머지 설정은 그냥 default로 두었다.


나중에 추가한 설정이 있는데 이 것은 나중에 상황을 설명하면서 함께 이야기 하겠다.

  • kafka 서버 기동

이제 kafka 서버를 기동해보자. 각각의 노드에서 /kafka/bin으로 이동한 후 다음과 같이 입력하여 기동한다.

NAMENODE:bin mazdah$ ./kafka-server-start.sh ../config/server.properties

아래와 같이 kafka 설정 정보들이 죽 나오고 이후 서버 start 메시지가 출력되면서 기동된다.

[2016-03-20 11:38:57,142] INFO KafkaConfig values: 
advertised.host.name = null
metric.reporters = []
quota.producer.default = 9223372036854775807
offsets.topic.num.partitions = 50
.
.
.

이후 테스트는 http://epicdevs.com/20의 내용을 그대로 진행해보았다.
물론 서버 정보 등은 나의 환경에 맞춰서..

  • topic 생성

SECONDARY-NAMENODE:bin mazdah$ ./kafka-topics.sh --create --zookeeper SECONDARY-NAMENODE.local:2181 --replication-factor 5 --partition 20 --topic test

그런데 처음부터 문제가 생겼다. 위와같이 시작을 했는데 아래와 같은 오류가 발생을 했다.

Exception in thread "main" org.I0Itec.zkclient.exception.ZkTimeoutException: Unable to connect to zookeeper server within timeout: 30000
at org.I0Itec.zkclient.ZkClient.connect(ZkClient.java:1223)
at org.I0Itec.zkclient.ZkClient.<init>(ZkClient.java:155)
at org.I0Itec.zkclient.ZkClient.<init>(ZkClient.java:129)
at kafka.utils.ZkUtils$.createZkClientAndConnection(ZkUtils.scala:89)
at kafka.utils.ZkUtils$.apply(ZkUtils.scala:71)
at kafka.admin.TopicCommand$.main(TopicCommand.scala:53)
at kafka.admin.TopicCommand.main(TopicCommand.scala)

ps 명령어로 각 노드에서 확인을 해보아도 모든 zookeeper 인스턴스들이 정상적으로 떠있는 상태였는데 

zookeeper 연결시 타임아웃으로 연결할 수 없다니…ㅠ.ㅠ


일단 퍼뜩 떠오르는 생각은 zookeeper의 leader가 NAMENODE.local에 설정되어 있다는 것이다. 

아마도 topic 생성 등의 중요 작업은 zookeeper leader 노드에서 해야 하는 것 같다. 

다음과 같이 NAMENODE.local에서 topic이 생성되도록 하니 정상적으로 topic이 생성되었다.
(실제로 zookeeper의 leader node에서 실행하지 않아도 —zookeeper 옵션만leader node로 해주어도 된다). 
다음과 같이 성공하였다.

SECONDARY-NAMENODE:bin mazdah$ ./kafka-topics.sh --create --zookeeper NAMENODE.local:2181 --replication-factor 5 --partition 20 --topic test

Created topic "test".
  • topic 리스트 보기

자 그러면 이제 topic 리스트를 한 번 보도록 하자.

SECONDARY-NAMENODE:bin mazdah$ ./kafka-topics.sh --list --zookeeper NAMENODE.local:2181

test

아직은 만들어진 topic이 test 하나밖에 없기 때문에 test만 출력된다.

  • topic 상세 보기

그리고 다음과 같이 test topic의 상세 내용을 확인해보자.

SECONDARY-NAMENODE:bin mazdah$ ./kafka-topics.sh --describe --zookeeper NAMENODE.local:2181 --topic test

Topic:test	PartitionCount:20	ReplicationFactor:5	Configs:
Topic: test	Partition: 0	Leader: 0	Replicas: 0,1,2,3,4	Isr: 0,1,2,3,4
Topic: test	Partition: 1	Leader: 1	Replicas: 1,2,3,4,0	Isr: 1,2,3,4,0
Topic: test	Partition: 2	Leader: 2	Replicas: 2,3,4,0,1	Isr: 2,3,4,0,1
Topic: test	Partition: 3	Leader: 3	Replicas: 3,4,0,1,2	Isr: 3,4,0,1,2
Topic: test	Partition: 4	Leader: 4	Replicas: 4,0,1,2,3	Isr: 4,0,1,2,3
Topic: test	Partition: 5	Leader: 0	Replicas: 0,2,3,4,1	Isr: 0,2,3,4,1
Topic: test	Partition: 6	Leader: 1	Replicas: 1,3,4,0,2	Isr: 1,3,4,0,2
Topic: test	Partition: 7	Leader: 2	Replicas: 2,4,0,1,3	Isr: 2,4,0,1,3
Topic: test	Partition: 8	Leader: 3	Replicas: 3,0,1,2,4	Isr: 3,0,1,2,4
Topic: test	Partition: 9	Leader: 4	Replicas: 4,1,2,3,0	Isr: 4,1,2,3,0
Topic: test	Partition: 10	Leader: 0	Replicas: 0,3,4,1,2	Isr: 0,3,4,1,2
Topic: test	Partition: 11	Leader: 1	Replicas: 1,4,0,2,3	Isr: 1,4,0,2,3
Topic: test	Partition: 12	Leader: 2	Replicas: 2,0,1,3,4	Isr: 2,0,1,3,4
Topic: test	Partition: 13	Leader: 3	Replicas: 3,1,2,4,0	Isr: 3,1,2,4,0
Topic: test	Partition: 14	Leader: 4	Replicas: 4,2,3,0,1	Isr: 4,2,3,0,1
Topic: test	Partition: 15	Leader: 0	Replicas: 0,4,1,2,3	Isr: 0,4,1,2,3
Topic: test	Partition: 16	Leader: 1	Replicas: 1,0,2,3,4	Isr: 1,0,2,3,4
Topic: test	Partition: 17	Leader: 2	Replicas: 2,1,3,4,0	Isr: 2,1,3,4,0
Topic: test	Partition: 18	Leader: 3	Replicas: 3,2,4,0,1	Isr: 3,2,4,0,1
Topic: test	Partition: 19	Leader: 4	Replicas: 4,3,0,1,2	Isr: 4,3,0,1,2

topic 생성시 partition을 20으로 지정했기 때문에 Partition: 0부터  Partition: 19까지 20개의 목록이 보이게 된다. 

각 항목에 대해서는 추후에 설명하기로 하고 대략 이런 모습이란 것만 확인하고 넘어가자.

  • producer를 이용한 메시지 생성

다음과 같이 입력하여 message룰 생성해보자.
이 명령에서 보여지는 9092포트는 kafka 설정 파일인 server.properties에 “listeners=PLAINTEXT://:9092”로 

설정이 되어있다. 실행하면 빈 프롬프트가 생기고 적절한 메시지를 입력하면 된다. 


입력한 후 엔터를 치면 다음 메시지를 입력할 수 있다. message1에서 message6까지 메시지를 생성해보았다.

SECONDARY-NAMENODE:bin mazdah$ ./kafka-console-producer.sh --broker-list NAMENODE.local:9092 --topic test

message1
message2
message3
message4
message5
message6
  • cunsumer를 이용한 message 확인

다음과 같이 consumer를 실행하여 producer에서 생성한 메시지를 확인한다. 입력한 순서대로 출력되지 않는 것이 

아마도 Map 형식의 처리인 것으로 보인다. (가운데 빈 라인은 message 입력 제일 마지막에 엔터를 한 번 입력해서 

그렇다)

SECONDARY-NAMENODE:bin mazdah$ ./kafka-console-consumer.sh --zookeeper NAMENODE.local:2181 --topic test -rom-beginning

message5
message3

message1
message2
message4
message6
  • topic의 삭제

그냥 테스트로 만든 topic이니 미련 없이 삭제해보자. test이후에 test2라는 topic을 하나 더 만든 상태이다.
topic의 삭제는 생성과 동일한 구문으로 수행을 하며 —create 대신 —delete를 사용하면 된다.

SECONDARY-NAMENODE:bin mazdah$ ./kafka-topics.sh --delete --zookeeper NAMENODE.local:2181 --replication-factor 5 --partition 20 --topic test2

Topic test2 is marked for deletion.
Note: This will have no impact if delete.topic.enable is not set to true.

그런데 여기서도 이상한 부분이 발견되었다. 마지막 줄에 보면 다음과 같은 내용이 있다.
Note: This will have no impact if delete.topic.enable is not set to true.

혹시나 해서 topic 리스트를 확인해보았다.

SECONDARY-NAMENODE:bin mazdah$ ./kafka-topics.sh --list --zookeeper NAMENODE.local:2181

test
test2 - marked for deletion

역시나 삭제한 test2 토픽에 marked for deletion라는 내용이 붙어있다. 어찌된 일일까? 

짐작했겠지만 답은 삭제시 보여진 NOTE의 내용에 있다. 설정 파일에 delete.topic.enable항목을 추가하고 

값을 true로 설정하면 된다.

delete.topic.enable=true

이후 다시 목록을 확인해보면 다음과 같이 말끔히 지워진 것을 확인할 수 있다.

SECONDARY-NAMENODE:bin mazdah$ ./kafka-topics.sh --list --zookeeper NAMENODE.local:2181

test

일단 오늘은 여기까지 기본적인 내용을 확인할 수 있었다.
다음 주에는 본격적으로 kafka에 대해 하나 하나 정리를 해보아야겠다.

반응형