인공지능 공부해보겠다고 설레발치기 시작한 것이 어언 2년여가 다되간다.

그간 나름 책도 좀 보고 동영상 강좌도 좀 보고...페이퍼나 논문은 하나도 안보고...ㅠ.ㅠ


그간 읽은 책들과 읽는 중인 책 그리고 읽기 위해 사놓은 책을 좀 정리해보면


읽은 책

텐서플로우 첫걸음
골빈해커의3분 딥러닝
머신러닝 워크북
밑바닥부터 시작하는 딥러닝

읽는 중인 책

핸즈 온 머신러닝
머신러닝 탐구생활


읽다가 보류한 책

강화학습 첫걸음
기초 수학으로 이해하는 머신러닝 알고리즘
처음 배우는 딥러닝 수학
프로그래머를 위한 선형대수


사놓기만 한 책

머신러닝 실무 프로젝트
딥러닝의 정석
러닝 텐서플로우
머신러닝 딥러닝 실전 개발 입문




하지만 공부를 해 가면 해 갈수록 궁금한 것은 더 많아지고 이제는 과연 내가 뭔가를 이해 하고는 있는 건가 하는 생각에

자괴감이 들기 시작했다. 똑같은 내용을 공부하고 있는데 볼수록 새로운 느낌?


결국 자괴감을 이기지 못하고 처음부터 다시 시작하는 길을 선택했다. 처음 시작부터 이해하지 못하고 넘어간 부분들을

차근차근 정리하고, 이해하고 넘어가야 할 것 같아서...(그런 의미에서 현재 읽고있는 2권의 책은 꽤 도움이 되는 것 같다)


특히나 전반적으로 내용을 이해하지 못하게 하는 주된 이유 중 하나가 어려운 용어들과 복잡한 공식들이기도 하고 

머신러닝의 기초가 되는 선형 회귀 등을 제대로 이해하지 않고 겁대가리 없이 덥썩 딥러닝으로 직행한 무모함도 충분히 

일조를 했기에 다시 처음부터 하나 하나를 정리하는 것으로부터 시작하기로 했다. 그리고 


패배적인 자기 만족일지는 모르겠으나 어차피 이 공부는 내가 이 분야의 전문가가 되기 위한 것이라기 보다는 나의 지적 만족을

위한 것이니 쉬엄쉬엄 간들 어떠랴 싶다. 태공망 여상은 나이 80에 주문왕을 만나 그 재능을 펼치기 시작했다는데 그렇다면

나에게는 아직 30년이란 시간이 남은 것 아닌가(뜻밖의 나이 공개가...ㅠ.ㅠ)


느려도 황소 걸음이랬으니 차분하게 한걸음 한걸음 가보자.

그 시작은 정규 방정식이다.


정규방정식

사실 그동안 비용함수를 최소화 하는 가중치를 찾기 위한 방법으로 경사하강법만을 알고 있었는데 이번에 핸즈 온 머신러닝을

읽으면서 처음 정규방정식이란 것이 있다는 것을 알게 되었다.


문제는 이 정규방적이라는 것이 행렬식으로 표현이 되어있어 문돌이의 사고방식으로는 이 것이 어떻게 경사하강법과 동일한

기능을 하게 되는지 이해가 가지 않는 것이었다. 그래서 새로운 시작의 첫 출발을 단순 선형회귀의 비용함수로부터 정규방정식을

도출하는 과정을 정리해보고자 한다.



선형 회귀 비용함수로부터 정규방정식 도출하기


복습

선형회귀의 가설함수 식에서 편향을 제거하자. 방법은 그냥 b = 0으로 초기화 하는 것이다.




    •비용함수도 다시 한 번 확인하자.



















사전 확인1 - ∑를 행렬로



∑ 로 표현되는 제곱의 합은 그 수들을 요소로 하는행렬과그 행렬의 전치행렬의 곱과 같다(복잡하니
1
행짜리 행렬로 확인해보자).




















사전 확인2전치행렬의 성질


     •전치행렬은 다음과 같은 성질이 있다.




























우선 cost함수는 W에 대한 함수이므로 함수 표기를 바꿔보자(함수명MSE는 최소제곱법의 영문 표기인 Mean Square Error의 약어이다).

이제 명확하게 이 함수는 x와 y에 대한 함수가 아니라 W에 대한 함수로 보일 것이다.









함수는W에 대한 함수인데 정작 함수 식에는 W가 안보이니 내부 함수도 원래대로 치환하자.






















사전 확인한 내용을 상기하면서 번 식으로 변환해보자
전치행렬의 성질에 따라 번 식으로 전개할 수 있다.
W를 포함한 식들을 다시 정리하면 번 식이 된다.
다시 한 번 전치행렬의 성질에 따라 식을 전개하면 번 식이 된다.
이 변형은 최초의 시그마 식을 전개해서 진행해도 동일한 결과가 나온다.






최종 정리된 식은 과 같고 이제 이 값을 미분하여비용함수가 최솟값을 갖는 W를 찾을 것이다.

비용함수가 최솟값을 갖기 위해서는 비용함수를 미분한 값이 0이 되어야 한다.
미분 과정을 명확하게 하기 위해 식을 한 번 정리해 주자(식 ).주의할 것은 W에 대해 미분한다는 점이다.
행렬 A에 대해 자신과 자신의 전치행렬과의 곱은제곱과 같다고 했다.그리고 전치행렬의 성질에 따라W와 X의 곱의 전치행렬은 X의 전치행렬과 W의 전치행렬의 곱과 같다(W와 X의 순서가 바뀜에 주의).




이제 거의 다 왔다.
미분한 함수는 식 와 같고 이제 
거추장스러운1/m도 없앨 수 
있다(사실 진작에 1/m을 없애고
보다 깔끔하게 식을 전개할 수도 
있었으나 나같은문돌이는 갑자기 
저런거 하나 없어져도 극도의
멘붕에 시달릴 수 있기에 끝까지 
가져왔다-.-).










최종 미분식을 W에 대해 정리해보자.
    •이렇게 해서 단순 선형 회귀의 비용함수로부터 정규방정식을 도출해보았다.













선형 회귀 비용함수로부터 정규방정식 도출하기2



정규방정식은 다른 형태로도 
도출할 수 있다우선 최초의 
식을 전개해보자.














이후 전개한 식을 W에 대해 미분한다.














최종 정리한 후 시그마를 
행렬로 변환해보자














하지만 아직 이해하지 못한 것이 하나 있다.
가설함수에서 편향을 제거하지않고 WX + b의 형태로 이 과정을 진행하게 되면 최종 정규방정식은 좌측과 같이 나온다.이 것이 앞서 도출해본 정규방정식과 동일한 식이란 것을 문돌이의 두뇌로는 이해하기 힘들다.ㅠ (분모와 분자 각각 - 뒤에 붙어있는 값들은 대체 어쩔...ㅠ.ㅠ)






일단 정규방정식은 좌측의 식으로 

알아두자

정규방정식은 행렬식으로 경사
하강법에 비해 많은 연산량이 필요
하지도 않고 학습률 설정 등
골치아픈 메타파라미터의 
설정을 신경쓰지 않아도 된다.
하지만 행렬 연산이다보니 특성의 
수가 늘어나면 계산속도가 많이 
느려지게 된다.다만 샘플 수에 
대해서는 선형적으로 비례한다고 
한다.
또한 정규방정식으로 학습된 선형 
모델은 예측이 매우 빠르다고 한다
(핸즈 온 머신러닝)



정리


이렇게 해서 새롭게 시작하는 인공지능 학습의 첫 단추를 꿰었다. 하지만 이렇게 차근차근 분석을 하면서도 여전히

어떤 부분에 대해서는 완전하게 이해하지 못한 채 그저 직관적인 이해로 두루뭉술하게 넘어가고 있는 상황이다.

사실 이러한 상태가 가장 환장하는 상태이다. 전체적인 흐름은 대충 이해가 가는데 어떤 디테일한 부분에서

뭔가 막혀있는 듯한 느낌...



첫 대상인 정규방정식도 정리를 하고 보니 아직은 부족한 상태라는 것을 알게 되었다.

이러한 과정이 큰 도움이 되는 것 같다.


아무튼 이번에는 용어 하나, 공식 하나도 집중해서 보면서 차근차근 진행을 해나가 보자. 

머신러닝 reboot는 이제 시작이다!




블로그 이미지

마즈다

이미 마흔을 넘어섰지만 아직도 꿈을 좇고 있습니다. 그래서 그 꿈에 다가가기 위한 단편들을 하나 둘 씩 모아가고 있지요. 이 곳에 그 단편들이 모일 겁니다...^^

Elasticsearch








Elasticserach에 Excel 데이터 입력하기 - JAVA API와 몇가지 설정


지난 시간에는 간단에서 Spring boot를 설정하면서 확인해야 했던 부분들을 중심으로 정리를 하였다.
일단 웹 프레이임워크가 갖추어졌으니 이제 시스템을 만들어가는 일만 남았다. 물론 파일 업로드, 엑셀 파싱 등의
기능들도 필요하지만 역시 가장 중요한 것은 Elasticsearch를 이용할 수 있게 해주는 API일 것이다.


지난 포스팅에서도 언급했지만 이미 Spring에는 Elasticsearch와 관련된 프로젝트가 있다. 하지만 안타깝게도
Spring Data Elasticsearch 프로젝트의 최신 버전도 아직은 Elasticsearch의 6.x 버전을 지원하지 못한다
(내가 이 작업을 시작하면서 검색했을 때는 Elasticsearch 2.4까지만 지원한다고 했었는데 그새 지원 버전이 조금
올라가긴 했다).


그래서 별도의 API 라이브러리를 참조하여 작업을 진행하였다.
물론 많은 API들이 존재하지만 오늘은 간단하게 Index 생성과 관련된 내용들만 살펴보도록 하겠다.


Client 연결


Index를 생성하기 위해서는 우선 Elasticsearch cluster의 노드에 접근을 해야 한다. API에서는 Client 인스턴스를
생성하여 연결한다. Client 클래스는 몇가지가 있는데 Low Level REST Client로 RestClient 클래스를 사용할 수
있고 이 RestClient를 wrapping한 RestHighLevelClient 클래스는 High Level REST Client라고 부른다.
여기에 다시 Indices(Elasticsearch 내부에서 관리하는 index들을 indices라 부른다)접근하기 위해
RestHighLevelClient를 한번 더 wrapping한 IndicesClient가 있고, 이 외에 TransportClient가 있다.



그런데 이 TransportClient는 조금 독특하게 HTTP가 아닌 TCP 프로토콜을 이용하며 따라서 사용하는 포트도
REST Client들이 기본 값을 기준으로 9200포트를 이용하는데 반해 TransportClient는 9300 포트를 이용한다.


TransportClient는 Elasticsearch 7.0에서 deprecate 예정이며 8.0에서는 제거될 것이라고 한다. 
TransportClient에 대한 자세한 내용은 아래 링크를 참조하도록 하자.


https://www.elastic.co/guide/en/elasticsearch/client/java-api/current/client.html


처음에는 TransportClient를 이용하느라 고생을 좀 했다. 그러다가 deprecate 예정이라는 정보를 보고는 미련 없이
REST Client로 바꾸어 사용하기로 했다.


기본적으로 High Level REST Client인 RestHighLevelClient 클래스를 사용하게 되겠지만 그 전에 Low Level 
REST Client에서 중요하게 짚고 넘어가야 할 부분이 있다(어차피 RescClient의 builder를 통해 생성하니 당연한
이야기이겠지만...). 바로 다음 링크에 있는 내용들 때문이다.


https://www.elastic.co/guide/en/elasticsearch/client/java-rest/6.2/java-rest-low.html


내용을 간략하게 보자면 Low Level REST Client에는 load balancing이라든지 failover, 장애 노드에 대한
패널티 부여, 그리고 옵션 사항이지만 전체 클러스터에서의 노드 찾기 등 클러스터를 관리하기 위해 필요한 많은
기능들이 구현되어있다. 특히 load balancing의 경우 clietn 생성시 파라미터로 전달된 각 노드들을 round-robin
방식으로 접근하여 rquest를 보내게 된다. 자세한 내용은 아래 링크에서 확인할 수 있다.


https://artifacts.elastic.co/javadoc/org/elasticsearch/client/elasticsearch-rest-client/6.2.3/org/elasticsearch/client/RestClient.html


마지막으로 client 연결 시 애를 먹었던 부분이 X-pack을 설치한 후 Elasticsearch 접근 시 계정 인증이 필요하게
되었는데 이에 대한 처리를 하느라 고생을 좀 했다. 이 부분은 샘플 코드로 설명을 대신한다.


public static RestHighLevelClient newRestHighLevelClient() {
 // X-pack 설치 시 아래와 같이 자격 증명을 해주어야 한다. user와 password에 각각 X-pack을 통해 설정한
 // ID와 비밀번호를 입력하면 되는데 ID는 보통 elastic이고 비밀번호는 자동 생성된 값이다. 
	final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
	credentialsProvider.setCredentials(AuthScope.ANY,
		        new UsernamePasswordCredentials(user, password));
		
	RestHighLevelClient client = new RestHighLevelClient(
		RestClient.builder(
			new HttpHost(hostData1, Integer.valueOf(httpPort), "http"))
		        .setHttpClientConfigCallback(new RestClientBuilder.HttpClientConfigCallback() {
		            @Override
		            public HttpAsyncClientBuilder customizeHttpClient(HttpAsyncClientBuilder httpClientBuilder) {
		                httpClientBuilder.disableAuthCaching(); 
		                return httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider);
			}
	 }));

	return client;
}



API 구현


아직까지는 Excel 데이터를 Elasticsearch로 입력하는 기능만을 구현하였기에 실제로 사용하는 API는 Create Index API(Index 생성 시 사용)와 Index API(Index를 이용하여 데이터를 입력하는 작업에 사용) 뿐이다.


Elasticsearch의 JAVA API들은 모두 2가지 종류가 있는데 바로 synchronous와 asynchronous 방식이다.
익히 알고 있듯이 synchronous는 요청을 한 후 그 결과를 리턴받은 후 프로세스가 진행되지만 asynchronous의
경우 요청후 바로 다음 프로세스가 진행되며 요청한 프로세스에 대한 결과는 별도로 구현된 listener에 의해 처리된다.
따라서 asynchronous API를 구현하는 경우에는 listener를 구현한 후 이 listener를 파라미터로 전달해야 한다. 


나같은 경우 처음에 asynchronous 방식을 알지 못한 상태에서 Elasticsearch API들을 모두 util성 클래스에
static 메소드로 구현을 했는데 아무래도 한 번 뒤집어 엎어야겠다...ㅠ.ㅠ


각 API 구현은 아래 링크의 예제를 거의 그대로 사용하였다.


https://www.elastic.co/guide/en/elasticsearch/client/java-rest/6.2/java-rest-high-create-index.html

https://www.elastic.co/guide/en/elasticsearch/client/java-rest/6.2/java-rest-high-document-index.html


전체적인 흐름은 우선 기존에 생성된 Index가 없는 경우에는 새로운 Index를 생성하도록 하고 이미 생성된 Index가
있는 경우에는 기존 Index와 type을 select 박스를 통해 선택하여 데이터를 입력하거나 아니면 새로운 Index를
생성하는 작업부터 시작하는 것을 선택하도록 하였다.


새로운 Index 생성 시에는 다음과 같은 파라미터를 입력받는다(아직 validation 체크 기능은 없다...-.-).


  1. Index 명
  2. alias
  3. type
  4. shard 수
  5. replica 수
  6. mapping 정보


Index가 생성되고 나면 생성된 Index들과 type들을 선택하여 데이터를 업로드 하는 화면으로 전환된다.
파일 업로드 기능을 통해 엑셀 파일을 업로드 하면 되는데 파일만 업로드한 후 나중에 데이터를 입력할 수도 있고
파일 업로드가 끝나면 바로 데이터 입력이 시작되도록 할 수도 있다.


문제는 데이터의 양이다.


이전 포스팅에서 말한 것처럼 현재 작업을 하려는 데이터는 대략 18개의 열과 50만개의 행으로 구성된 엑셀 파일이다.
가급적이면 다른 전처리(데이터 정제 작업 제외) 없이 한 번에 입력하기를 원하지만 웬만한 시스템이 아니면 입력 중
OOM을 맞닥뜨려야만 했다(물론 개발자 PC로써도 사양은 좀 낮았다...ㅠ.ㅠ).


다수의 데이터를 한 번에 입력하는 작업인만큼 bulk API를 이용하여 작업을 하였다. 처음에는 전체 데이터를 입력
하도록 해보았으나 Elasticsearch에서 timeout이 걸리고 말았다. kibana로 확인해보니 데이터는 모두 입력 된 것
같은데 정상적으로 종료 처리가 되지 않았다. 결국 현재 내 시스템에서 안정적인 입력 량인 10만 건 단위로 나누어
bulk request를 보내도록 구현하였다. 이렇게 하니 50만 건 입력하는데 대략 3분 전후가 걸렸다.


Elasticsearch 설정


하지만 API 구현쪽에서만 처리한다고 모든 것이 해결되는 것은 아니었다.
사실 개발자로서 굳이 알아야 하나 하는 생각도 들긴 하지만 그래도 어렵지 않은 내용이니 아주 얕은 수준에서는
튜닝(이라고 말하기는 부끄럽지만...-.-)은 해주는 것이 좋을 것 같았다. 유일하게 해준 작업은 jvm.options 파일에서
Xms와 Xmx를 수정한 것이다. 파일 경로는 ${ELASTIC_HOME}/config/jvm.options이다.


다만 이렇게 heap size를 설정할 때 주의할 사항이 있는데 일단 heap size가 커지면 가용성은 좋아지지만 GC 수행
시간이 오래 걸리는 단점이 있고 Xmx의 경우 OS의 커널 시스템이 사용할 부분을 고려하여 전체 메모리의 50%를
넘지 않도록 권고하고 있다. 그밖에 compressed ordinary object pointers라는 조금은 전문적인 내용에 대한
권고사항이 있는데 이는 링크로 대신한다.


https://www.elastic.co/guide/en/elasticsearch/reference/current/heap-size.html


위 링크에 보면 heap size를 jvm.options 파일이 아닌 시스템 환경변수에 설정하는 방법도 나와있으니 참고하자.


다음으로는 Web을 통한 접근과 관련된 설정으로 시스템 구현 후 뭔가 허전하여 간단하게 Elasticsearch의 몇가지
정보를 확인할 수 있는 버튼을 추가하였다. 이 작업은 다음 번에 포스팅하겠지만 jQuery를 통해 직접 REST API를
호출하도록 하였는데, 이 때 몇가지 오류가 발생을 하였다. 웹에서 접근시 발생하는 오류를 막기 위해서는 설정 파일인
elasticsearch.yml 파일에 다음의 내용을 추가해주어야 한다.


http.cors.enabled: true
http.cors.allow-origin: "*"
http.cors.allow-credentials: true
http.cors.allow-headers: "X-Requested-With, Content-Type, Content-Length, Authorization"
http.cors.allow-methods: OPTIONS, HEAD, GET, POST, PUT, DELETE


당장에 운영 시스템을 관리할 것이 아니라면 그냥 이정도 설정이면 충분할 것이다.


정리


Elasticsearch의 API는 워낙 간단하게 구현되어있어서 달리 설명할 것도 없을뿐더러 오히려 공식 홈페이지에 더 잘
설명이 되어있기에 굳이 이 자리에서 다시 설명할 필요를 못느낀다. 실제로 구현한 내용도 몇가지 시스템에 특화된 
내용을 제외하고는 공식 홈페이지의 예제 snippet를 그대로 Copy & Paste한 수준이다.


어쨌든 50만건의 데이터를 파일 업로드 한 번으로 3분정도의 시간에 Elasticsearch로 입력할 수 있게 되어 나름
만족스럽다. 다만 경력 18년차의 코드로 보기에는 너무 형편없는 코드를 공개해야 하나 생각하니 부끄부끄할 뿐...*^^*


잠깐 삼천포를 좀 들르자면 사실 현재 실질적으로 가장 필요하고 또 공부하고 싶은 부분은 테스트 코드에 관한
부분이다. TDD든 아니면 단순 Unit Test든...테스트 코드도 없는 소스를 공개하려니 뭔가 알맹이는 홀랑 까먹고
껍데기만 올리는 기분이랄까? (거꾸로인가?)


기본적인 시스템 구현 내용은 이정도에서 마무리하고 마지막 포스팅에서는 짧게나마 jQuery에서 REST API를
호출하는 부분을 살펴보고 만들어진 시스템에 대해 설명하고 마치고자 한다. 지금은 형편없고 특정 목적을 위해
만들어진 시스템이지만 평생 목표로 다듬어가야겠다.

블로그 이미지

마즈다

이미 마흔을 넘어섰지만 아직도 꿈을 좇고 있습니다. 그래서 그 꿈에 다가가기 위한 단편들을 하나 둘 씩 모아가고 있지요. 이 곳에 그 단편들이 모일 겁니다...^^










Elasticserach에 Excel 데이터 입력하기 - 기본 설정과 적용 라이브러리


올해에는 선택과 집중을 분명히 하기로 했는데…제 버릇 개 못준다더니…어김없이 또 여기저기를 들쑤시기 시작했다…-.-


Hadoop이니 Hbase니 Spark니 잔뜩 설치해놓고는 다시 Elasticsearch에 관심을 갖게 된 것이다. 어떤 것인지 한 번
설치나 해보자고 시작한 것이 쉬운 설치 방법과 사용법에 혹해서 더 깊은 내용을 알고 싶어진 것이다. 마침 분석해보고
싶은 데이터가 있어 이참에 한번 Elasticsearch를 이용해보자고 마음먹었다.


현재까지 진행된 작업은 spring boot 기반의 웹 시스템을 통해 Excel파일을 업로드하면 이를 파싱하여 
Elasticsearch로 입력하고 업로드된 파일들은 별도로 관리 가능하도록 만든 것이다. 앞으로 3차례에 걸쳐 이 개발 
과정을 정리해보도록 하겠다.


발단


2월 경…새 집으로 이사를 좀 하는게 어떨까 싶어 새 집을 물색하였다. 그리고 기왕지사 옮기는 것, 가급적이면
앞으로 집 값도 좀 올라 주면 좋을 것 같다는 생각이 들었다. 하지만 부동산이라고는 쥐뿔도 모르는 상태에서 어디
그게 쉬운일이던가…그냥 새 집은 새 집이고 마침 어떤 데이터로 빅데이터나 AI를 공부해보나 하던 참이라 부동산
데이터를 사용해보면 어떨까 하는 생각이 들어 공공 데이터 포털에서 부동산 관련 데이터를 모으기 시작했다.


처음 모은 데이터는 1996년 부터 2017년까지의 공시지가 데이터였다. 그리고 첫 난관이 시작되었다.
데이터는 모았는데 이 데이터를 어떻게 Elasticsearch에 넣어야 할지 방법을 몰랐던 것이다. 그렇게 Excel
데이터를 Elasticsearch로 입력하는 방법을 찾다가 가장 적절해 보이는 솔루션으로 찾은 것이 excelastic이라는
vert.x 기반의 stand alone 애플리케이션이었다.


그런데 말입니다…안타깝게도 이 애플리케이션도 문제가 있었다. 클라이언트 PC 및 Elasticsearch가 설치된 서버의
사양과도 관계가 있겠지만 10만 건 정도 입력을 시도하면 여지없이 OOM이 발생하여 정상 입력이 되지 않았다.
시행착오를 거쳐 확인한 안정적인 입력 건수는 약 5만 건 정도였다. 공시지가 데이터가 년도당 약 50만 건의 데이터가
있는데 이 파일을 Elasticsearch로 입력하려면 파일을 10개로 쪼개는 작업을 해주어야 한다는 말이다. 이 작업 조차
웬만한 PC에서는 쉽지 않다. 내 맥미니가 i5(2.5GHz)에 RAM 16Gb인데도 50만 건 들어있는 Excel 파일을 열어서
5만 건씩 10개로 쪼개다보면 버벅거리기가 일쑤였다.


그래서 목마른 놈이 우물을 판다고…직접 하나 만들기로 했다. 그리고 기왕 만드는 김에 이것저것 기능을 좀 추가해보자
했는데 마침 또 회사에서 KMS를 Elasticsearch 기반으로 만들면 어떻겠냐는 이야기가 나와 겸사겸사 함께 진행해
보기로 했다. 


관련 기술들


이 작업에 사용된 기술들은 다음과 같다.


  1. Spring boot 2.0.0
  2. jQuery + bootstrap (UI는 AdminLTE라는 오픈소스 사용)
  3. Elasticsearch 6.2.1 ( + X-Pack)
  4. MySQL 5.6.38
  5. Spring Tool Suite 3.9.2


각 기술의 세세한 부분보다는 작업을 진행하면서 어려움을 겪었던 부분들 혹은 편리했던 기능들에 대한 팁 수준의
정리를 진행하고자 한다.


Spring boot로 삽질하기


작년까지는 현재 일하는 곳의 업무 시스템 개발을 위해 Spring으로 2차례 정도 가벼운 웹 시스템을 개발한 적이 있다.
그 과정에서 Spring boot를 개인 프로젝트에 사용한 적은 있지만 잠깐 건드려보다가 방치되고 말았다. 그리고는 이번에
다시 Spring boot를 이용하기로 했다. 마음같아서는 마이크로서비스에 대한 공부도 곁들여 하면서 구현을 해보고
싶었으나 너무 학습의 범위가 넓어질 것 같아 그냥 Spring을 쓰듯 Spring boot를 쓰기로 했다…-.-


STS에서 서버(Tomcat) 사용하기


경력에 걸맞지 않은 초보적인 실수가 참 많다…ㅠ.ㅠ 늘 겪는 실수 중 하나가 프로젝트를 생성한 후 STS에서 바로
서버 연결하여 실행하는 부분인데 이번에도 여지없이 프로젝트를 생성하고 나니 프로젝트의 서버 설정이 뭔가 이상하다.


서버를 선택할 수 있는 화면이 나오지 않고 “This project is not associated with any server”라는 문구가 보인다.


이 것은 프로젝트 생성 처음 설정에서 packaging 항목을 jar로 선택한 결과이다. jar로 선택한 경우 Stand alone
프로젝트로 판단하여 외부 서버와 연결하는 설정이 나타나지 않는다. 




packaging을 war로 하면 서버 설정 창에서 Tomcat 등의 was와 연결이 가능해진다.




Security 사용


프로젝트를 생성한 후 기본적인 REST API를 구현하였고 테스트를 위해 STS 내에서 Tomcat을 구동시켜 브라우저를
통해 URL을 호출하여보았다. 그런데…난데없이 계정 입력창이 뜨는 것이 아닌가?


Spring security


확인 결과 이 것은 프로젝트 생성 시 의존성 설정 부분에서 Security를 체크했기 때문이었다. 



Security를 체크함으로 해서 많은 부분에 영향을 받았다. 파일 업로드, iframe 사용 등에서도 문제가 생겨 확인해보면
모두 Security 관련 설정 때문이었다. 계정 로그인 처리, 파일 업로드 문제, iframe 사용과 관련된 각각의 내용들을
모두 확인 후 적용하였으나 아직은 잘 모르는 부분이 많기에 아래 코드로 Security는 bypass하는 수준에서 적용을
마무리 하였다.


@Override
public void configure(WebSecurity web) throws Exception {
	// TODO Auto-generated method stub
	super.configure(web);
		
	web.ignoring().antMatchers("/**");
}


위 코드는 JAVA config 설정을 이용할 경우 WebSecurityConfigurerAdapter를 상속받은 config 클래스를 
생성하여 코딩하면 된다. Spring (boot)에서 Security를 사용하는 방법은 아래 링크를 참조하였다.


https://spring.io/guides/gs/securing-web/


DB 연결 설정


Spring을 이용하는 경우에는 보통 다음과 같은 과정을 거쳐 DB를 연결하고 CRUD 작업을 진행하였다.


  1. DataSource 처리를 위한 Config 클래스 생성
  2. 필요에 따라 properties 파일에 DB 연결정보 추가
  3. Service 인터페이스와 그 구현 클래스 생성
  4. Controller 클래스에서 의존성 주입을 통해 Service에 선언한 CRUD 메서드를 이용하여 작업


처음엔 이 과정만 생각하고 진행하다가 꽤 심한 삽질을 했다. 내가 ORM과 관련하여 JPA를 사용하도록 설정한 것을
깜빡 한 것이다. JPA를 이용할 경우에는 1번과 3번의 과정이 필요없다. application.properties에 DB 연결 설정만
등록하면 바로 DB와 연결이 되며 Entity 클래스와 Repository 인터페이스를 구현하여 사용하면 된다.


JPA를 통한 MySQL 연동은 아래 두 곳의 사이트에서 도움을 받았다.


https://docs.spring.io/spring-data/jpa/docs/2.0.5.RELEASE/reference/html/

https://www.callicoder.com/spring-boot-rest-api-tutorial-with-mysql-jpa-hibernate/


Elasticsearch API 사용


Spring 프로젝트 중에도 Spring Data Elasticsearch라는 관련 프로젝트가 있다. 관련 링크는 아래와 같다.


https://projects.spring.io/spring-data-elasticsearch/


하지만 내용을 살펴보면 알겠지만 가장 최신 버전의 릴리즈도 Elasticsearch 5.5.0까지만 지원을 한다.
아래 링크를 보면 Spring Data Elasticsearch의 각 버전과 그 버전에서 지원하는 Elasticsearch 버전이
정리되어 있다.


https://github.com/spring-projects/spring-data-elasticsearch


하지만 나는 이미 Ealsticsearch 6.2.1 버전을 설치한터라 안타깝게도 Spring Data Elasticsearch는
사용하지 못하고 별도로 6.2 버전대의 라이브러리를 pom.xml 파일에 다음과 같이 추가하였다.


<dependency>
	<groupId>org.elasticsearch</groupId>
	<artifactId>elasticsearch-core</artifactId>
	<version>6.2.2</version>
</dependency>
<dependency>
	<groupId>org.elasticsearch</groupId>
	<artifactId>elasticsearch-hadoop-mr</artifactId>
	<version>6.2.2</version>
</dependency>
<dependency>
	<groupId>org.elasticsearch</groupId>
	<artifactId>elasticsearch-spark-20_2.11</artifactId>
	<version>6.2.2</version>
</dependency>
<dependency>
	<groupId>org.elasticsearch.client</groupId>
	<artifactId>elasticsearch-rest-high-level-client</artifactId>
	<version>6.2.2</version>
</dependency>
<dependency>
	<groupId>org.elasticsearch</groupId>
	<artifactId>elasticsearch-hadoop</artifactId>
	<version>6.2.2</version>
</dependency>
<dependency>
	<groupId>org.elasticsearch.client</groupId>
	<artifactId>elasticsearch-rest-client</artifactId>
	<version>6.2.2</version>
</dependency>
<dependency>
	<groupId>org.elasticsearch</groupId>
	<artifactId>elasticsearch</artifactId>
	<version>6.2.2</version><!--$NO-MVN-MAN-VER$-->
</dependency>
<dependency>
	<groupId>org.elasticsearch.client</groupId>
	<artifactId>transport</artifactId>
	<version>6.2.2</version>
</dependency>
<dependency>
	<groupId>org.elasticsearch.plugin</groupId>
	<artifactId>transport-netty4-client</artifactId>
	<version>6.2.2</version>
</dependency>
<!-- Elasticsearch 설치 후 X-Pack을 설치했기 때문에 추가 -->
<dependency>
	<groupId>org.elasticsearch.client</groupId>
	<artifactId>x-pack-transport</artifactId>
	<version>6.2.2</version>
</dependency>


현재 상태에서 모든 라이브러리가 다 필요한 것은 아니지만 추후 Hadoop이나 Spark와의 연동도 염두에 두고 있기에
그냥 함께 설치하였다.


Excel parsing


Excel 파일을 분석하는 것은 가장 많이 사용하는 POI 라이브러리를 사용하였다.
하지만 일반적으로 알려진 사용법으로는 벌써 이 단계에서 50만건을 처리하는데 OOM이 발생하였다.
해결책을 찾아야 했다. 게으른 개발자의 숙명으로 직접 코딩을 해야 하는 방법보다는 누군가 만들어놓은 라이브러리가
없을까를 우선하여 구글링을 하였다…ㅠ.ㅠ


역시나 세상에는 나같은 불쌍한 중생을 거둬 먹이는 천사같은 분들이 늘 존재한다. 마침 내가 필요로 하는 용도의
라이브러리가 똭! 눈에 띄였다. 이 라이브러리를 설치하여 사용하니 50만건을 OOM 없이 빠른 속도로 parsing해
주었다. 라이브러리는 pom.xml에 다음과 같이 추가하면 된다.


<dependency>
    <groupId>com.monitorjbl</groupId>
    <artifactId>xlsx-streamer</artifactId>
    <version>1.2.0</version>
</dependency>


라이브러리 소스는 아래 링크에서 확인할 수 있다.


https://github.com/monitorjbl/excel-streaming-reader


정리


가장 첫 단계로 Spring boot 및 java 프로그래밍 관련된 내용을 가지고 포스팅을 해보았다. 주 목적이 Spring boot나
java 프로그래밍이 아니므로 부족한 내용이 많겠지만 이와 관련해서는 더 자세하고 정확한 설명이 있는 사이트나 
블로그를 참조하는 편이 더 나을 것이다.


자꾸 요상한(?) 것들에 관심을 가지게 되면서 최종 롤인 iOS 개발자로서도 또 그 이전까지의 롤이었던 java개발자로서도
점점 역량이 떨어지는 것 같다…ㅠ.ㅠ 드문드문 Spring 또는 Spring boot를 접하다보니 별 것 아닌 일로 시간을 
허비하기 일쑤다. 그래도 불행 중 다행이랄까? 요즘은 워낙 양질의 자료를 다양한 경로로 쉽게 구할 수 있다보니 그럭저럭
이정도나마 하고 있지 않나 싶다.


다음 포스팅에서는 Elasticsearch의 JAVA API와 관련된 내용을 조금 더 상세하게 살펴보고 또 Elasticsearch의
설정 몇가지를 함께 알아보겠다.


전체 소스를 공유하려고 했는데 오늘 문득 프로젝트명, github의 레포지터리명, 프로젝트의 패키지명 등이 맘에 들지
않아 전체적으로 수정을 하고 있어 당장에는 힘들 것 같다. 이 시리즈가 마무리되는 시점에(이 글 포함 3개의 포스팅으로
계획 중) 전체 소스를 공유하도록 하겠다.

블로그 이미지

마즈다

이미 마흔을 넘어섰지만 아직도 꿈을 좇고 있습니다. 그래서 그 꿈에 다가가기 위한 단편들을 하나 둘 씩 모아가고 있지요. 이 곳에 그 단편들이 모일 겁니다...^^

TensorFlow







목차



3소스코드로 텐서플로우 맛보기 : [CNN] CIFAR-10 ~ cifar10_train.py (이번 글)





소스코드로 텐서플로우 맛보기 : [CNN] CIFAR-10


이제 가장 중요한 부분은 지나갔다.
생각해보면 전체적인 흐름을 먼저 살펴보고 세부적인 내용들을 분석했어야 할 것 같은데 순서가 거꾸로 되어버렸다.
아무래도 전체 포스팅을 마무리한 후 다시 한 번 되짚는 과정을 거쳐야 할 것 같다.


앞서 분석한 내용들은 모델을 구성하고 loss값을 생성하고 optimizer를 적용하는 구체적인 내용들이었다.
처음 딥러닝을 공부할 때는 각각의 단계가 거의 1줄 코딩이었던 것을 생각하면 이 소스는 매우 복잡해보인다.
그러나 세부적인 설정들이 더 추가되었을 뿐 근본적인 맥락은 다를 바가 없다.


자세한 내용은 복습 시간에 다시 살펴보고 오늘은 사용자와 인터페이스하는 소스를 살펴보도록 하자.


cifar10_train.py


소스 분석에 들어가기 전에 참고로 이 소스를 훈련시켰을 때의 정확도가 소스 첫머리의 주석에 표시되어있다.


accuracy



뭐 흙수저가 사용할 수 있을만한 장비는 아닌 듯하니 그냥 그런가보다 하고 넘어가자…-.-

from __future__ import absolute_import
from __future__ import division
from __future__ import print_function

from datetime import datetime
import time

import tensorflow as tf

import cifar10


첫 3줄은 앞서도 보았듯이 python 2와 3의 호환을 위한 것이고 datetime과 time은 이름에서도 알 수 있듯이 날짜와
시간을 사용하기 위한 것으로 print를 하거나 수행 시간을 체크하기 위한 용도로 import 하였다. 마지막 2줄도 생략

FLAGS = tf.app.flags.FLAGS

tf.app.flags.DEFINE_string('train_dir', '/tmp/cifar10_train',
                           """Directory where to write event logs """
                           """and checkpoint.""")
tf.app.flags.DEFINE_integer('max_steps', 1000000,
                            """Number of batches to run.""")
tf.app.flags.DEFINE_boolean('log_device_placement', False,
                            """Whether to log device placement.""")
tf.app.flags.DEFINE_integer('log_frequency', 10,
                            """How often to log results to the console.""")


FLAG 역시 이전 포스팅에서 설명을 하였는데 그 아래 tf.app.flags.DEFINE_XXX로 지정한 이름으로 그 값을
사용할 수 있다. 즉, FLAG.train_dir은 '/tmp/cifar10_train’라는 값을 가지고 있게 된다. 두 번째 줄에 보면 학습
step을 1000000회로 설정하였다.


train()

# 학습을 실행시키는 함수
def train():
  """Train CIFAR-10 for a number of steps."""

# with tf.Graph().as_default() 문장은 지금까지 만들었던 모든 그래프 구성 요소(operation과 tensor들)을
# 하나의 전역 Graph 안에서 사용하겠다는 의미이다.  
  with tf.Graph().as_default():
# global_step은 학습의 step 카운트를 자동으로 관리해주는 tensor로 사용자가 별도로 step을 카운트
# 할 필요가 없이 이 global_step을 이용하면 된다.
    global_step = tf.train.get_or_create_global_step()

    # Get images and labels for CIFAR-10.
    # Force input pipeline to CPU:0 to avoid operations sometimes ending up on
    # GPU and resulting in a slow down.
# 학습을 수행할 장치를 지정. 첫 번째 CPU를 사용하도록 지정하고 있다. GPU를 사용하는 방법은
# cifar10_multi_gpu_train.py 소스를 참조하면 된다. 비록 multi gpu를 사용하는 소스지만...-.-
    with tf.device('/cpu:0'):
# 학습에 사용할 미니 배치 크기의 image와 label을 가져온다.
# 자세한 내용은 cifar10.py 소스의 distorted_inputs함수 참조
# http://mazdah.tistory.com/814
      images, labels = cifar10.distorted_inputs()

    # Build a Graph that computes the logits predictions from the
    # inference model.
# 학습 모델 생성. 자세한 내용은 cifar10.py 소스의 inference함수 참조
# http://mazdah.tistory.com/814
    logits = cifar10.inference(images)

    # Calculate loss.
# 손실값 계산. 자세한 내용은 cifar10.py 소스의 loss함수 참조
# http://mazdah.tistory.com/814
    loss = cifar10.loss(logits, labels)

    # Build a Graph that trains the model with one batch of examples and
    # updates the model parameters.
# 실제 학습을 수행할 operation 생성. 자세한 내용은 cifar10.py 소스의 loss함수 참조
# http://mazdah.tistory.com/814
    train_op = cifar10.train(loss, global_step)

# 아래 나오는 tf.train.MonitoredTrainingSession에 사용하기 위한 로그 hooker
# MonitoredTrainingSession.run() 호출에 대한 로그들을 hooking하는 역할을 한다.
# Pythons에서는 클래스 선언 시 ( )안에는 상속할 클래스를 지정한다. 즉, _LoogerHook 클래스는
# tf.train.SessionRunHook 클래스를 상속하여 만들어지게 되며 정의된 함수들은 이 클래스의
# 함수들을 Overriding해서 구현한 함수들이다.
    class _LoggerHook(tf.train.SessionRunHook):
      """Logs loss and runtime."""

# session을 이용할 때 처음 한 번 호출되는 함수
      def begin(self):
        self._step = -1
        self._start_time = time.time()

# run() 함수가 호출되기 전에 호출되는 함수
      def before_run(self, run_context):
        self._step += 1
        return tf.train.SessionRunArgs(loss)  # Asks for loss value.

# run() 함수가 호출된 후에 호출되는 함수
      def after_run(self, run_context, run_values):
        if self._step % FLAGS.log_frequency == 0:
          current_time = time.time()
          duration = current_time - self._start_time
          self._start_time = current_time

          loss_value = run_values.results
          examples_per_sec = FLAGS.log_frequency * FLAGS.batch_size / duration
          sec_per_batch = float(duration / FLAGS.log_frequency)

          format_str = ('%s: step %d, loss = %.2f (%.1f examples/sec; %.3f '
                        'sec/batch)')
          print (format_str % (datetime.now(), self._step, loss_value,
                               examples_per_sec, sec_per_batch))

# 분산 환경에서 학습을 실행할 때 사용하는 Session. 분산 환경에 대한 지원을 해준다.
# (Hook를 이용한 로그 관리, 오류 발생시 복구 처리 등)
    with tf.train.MonitoredTrainingSession(
        checkpoint_dir=FLAGS.train_dir,
        hooks=[tf.train.StopAtStepHook(last_step=FLAGS.max_steps),
               tf.train.NanTensorHook(loss),
               _LoggerHook()],
        config=tf.ConfigProto(
            log_device_placement=FLAGS.log_device_placement)) as mon_sess:
      while not mon_sess.should_stop():
# 드디어 마무리~ 학습 operation을 실제로 수행시킨다.
        mon_sess.run(train_op)



main(argv=None)

# CIFAR-10 데이터를 다운로드 받아 저장. cifar10.py 소스 참조
#  http://mazdah.tistory.com/814
cifar10.maybe_download_and_extract()

# 학습 수행 중의 로그를 저장할 디렉토리 생성. 기존에 동일 디렉토리가 있다면 삭제 후 생성.
if tf.gfile.Exists(FLAGS.train_dir):
  tf.gfile.DeleteRecursively(FLAGS.train_dir)
tf.gfile.MakeDirs(FLAGS.train_dir)

# 학습 시작
train()



정리


소스 길이에 비해 분석하는 데 너무 많은 시간이 걸렸다…ㅠ.ㅠ
지난 포스팅에서도 언급한 것처럼 매개 변수나 리턴값들이 모두 tensor 형태이고 TensorFlow의 API 문서에 있는
내용들이 수학적인 내용을 많이 포함하고 있어 다른 언어나 프레임워크의 문서를 읽는 해석하는 것에 비해 원문
해석도 꽤나 어려웠다.


포스팅한 내용에 부정확한 내용이 있을지도 모르겠기에 일단 CIFAR-10 예제 코드를 실제로 돌려보고
그 중간 로그나 결과 값들과 비교해가면서 다시 한 번 찬찬히 살펴볼 필요가 있을 것 같다. 그리고 추후에 이 소스에 
쓰인 API들을 별도로 정리해보겠다.


소스 중에는 아직 평가를 위한 cifar10_eval.py이 남아있는데 요건 우선 학습 관련 내용을 마무리하고 
진행해보도록 하겠다.

블로그 이미지

마즈다

이미 마흔을 넘어섰지만 아직도 꿈을 좇고 있습니다. 그래서 그 꿈에 다가가기 위한 단편들을 하나 둘 씩 모아가고 있지요. 이 곳에 그 단편들이 모일 겁니다...^^

Tensorflow









소스코드로 텐서플로우 맛보기 : [CNN] CIFAR-10


지난 포스팅에서 살펴보았던 cifar10_input.py는 데이터를 불러와서 이미지를 임의 조작한 후 배치 사이즈 크기로
나누어 담아 리턴해주는 기능을 하였다. 전체 프로세스의 가장 첫 단계라고도 할 수 있다.


오늘 살펴볼 cifar10.py는 가장 핵심적인 소스로 모델과 네트워크를 구성하는 내용이 주를 이루고 있다.
그만큼 코드의 길이도 전체 소스 중 가장 길다.


차근차근 살펴보도록 하자.


cifar10.py

from __future__ import absolute_import
from __future__ import division
from __future__ import print_function

지난 포스팅과 마찬가지로 위 3줄의 import 문은 python 2와 3의 호환성을 위한 것이다.


# OS의 자원을 사용할 수 있게 해주는 모듈
import os
# 정규표현식을 사용할 수 있게 해주는 모듈
import re
# python interpreter에 의해 관리되는 변수나 함수에 접근할 수 있도록 해주는 모듈
import sys
# tar 압축을 핸들링할 수 있게 해주는 모듈
import tarfile

여러 다양한 기능을 사용할 수 있게 해주는 import문들이다. os, sys, tarfile 등은 원격으로 cifar10 데이터셋을
다운로드 받기 위해 쓰인다.


# six 모듈은 python 2와 3의 함수를 함께 사용할 수 있도록 해줌. urllib는 URL 관련 모듈로 역시
# 데이터 셋 다운로드에 사용된다.
from six.moves import urllib
#텐서플로우 모듈
import tensorflow as tf

# 지난 포스팅에서 살펴본 cifar10_input.py 참조
import cifar10_input

몇가지 모듈이 추가로 import 되었으나 대부분 CIFAR10 데이터 셋을 원격으로 다운로드 받기 위한 것으로 이미
별도로 데이터 셋을 다운로드 받아두었다면 무시해도 좋을 것이다.


소스 앞부분에 영문으로 중요한 함수에 대한 설명이 주석으로 달려있다. 일단 간단하게 그 내용을 살펴보면 
다음과 같다.


  • inputs, labels = distorted_inputs( )
    : 학습에 사용할 데이터를 불러온다. 이 함수 안에서 cifar10_input.py에 있는 distorted_inputs( )
    함수를 호출하여 처리한다.
  • predictions = inference(inputs)
    : 파라미터로 전달되는 모델(inputs)에 대한 추론을 계산하여 추측한다.
  • loss = loss(predictions, labels)
    : 해당 라벨에 대해 예측값에 대한 총 손실 값을 구한다.
  • train_op = train(loss, global_step)
    : 학습을 수행한다.


위의 4개 함수가 가장 핵심적인 내용이라 할 수 있다.
이제 전체 코드를 차근차근 살펴보자.


# tf.app.flags.FLAGS는 상수의 성격을 갖는 값을 관리하는 기능을 한다.
# tf.app.flags.DEFINE_xxx 함수를 이용하여 첫 번째 파라미터에 사용할 이름을 넣고
# 두 번째 파라미터에 사용할 값을 설정하면 이후 'FLAGS.사용할 이름' 형식으로 그 값을
# 사용할 수 있다. 아래 첫 번째 코드의 경우 FLAGS.batch_size라는 코드로 128이라는 값을
# 사용할 수 있다.
FLAGS = tf.app.flags.FLAGS

# Basic model parameters.
tf.app.flags.DEFINE_integer('batch_size', 128,
                            """Number of images to process in a batch.""")
tf.app.flags.DEFINE_string('data_dir', '/tmp/cifar10_data',
                           """Path to the CIFAR-10 data directory.""")
tf.app.flags.DEFINE_boolean('use_fp16', False,
                            """Train the model using fp16.""")

# Global constants describing the CIFAR-10 data set.
# cifar10_input.py에 정의되어있던 값들
IMAGE_SIZE = cifar10_input.IMAGE_SIZE  #24
NUM_CLASSES = cifar10_input.NUM_CLASSES  #10
NUM_EXAMPLES_PER_EPOCH_FOR_TRAIN = cifar10_input.NUM_EXAMPLES_PER_EPOCH_FOR_TRAIN  #50000
NUM_EXAMPLES_PER_EPOCH_FOR_EVAL = cifar10_input.NUM_EXAMPLES_PER_EPOCH_FOR_EVAL #10000


# Constants describing the training process.
# tf.train.ExponentialMovingAverage에서 사용할 가중치 값
MOVING_AVERAGE_DECAY = 0.9999     # The decay to use for the moving average.
# 학습 속도 감소 후의 epoch 수
NUM_EPOCHS_PER_DECAY = 350.0      # Epochs after which learning rate decays.
# 학습률 감소를 위한 변수
LEARNING_RATE_DECAY_FACTOR = 0.1  # Learning rate decay factor.
# 초기 학습률
INITIAL_LEARNING_RATE = 0.1       # Initial learning rate.


음…수학을 깊이 들어가긴 싫지만 얼레벌레 그냥 넘어가는 것도 그러니 몇 가지 개념은 좀 알아보고 가자.


Exponential Moving Average

우선 이동평균(Moving Average)라는 것은 특정 기간동안 내에 측정된 값의 평균을 의미한다.
이 이동평균에는 단순이동평균, 가중이동평균, 그리고 여기서 사용하는 지수이동평균이 있는데
이 지수이동평균은 가장 최근 값에 더 큰 가중치를 두어 평균을 계산하는 방식이라고 한다.


일단 위 코드 중 MOVING_AVERAGE_DECAY 이후의 설정들은 모두 학습률 조정을 위한 것으로
train( ) 함수에서 사용을 하게 된다. 기본적으로 학습이 진행됨에 따라 학습률을 기하급수적으로 감소시켜
나가는 방법을 취하고 있다. 자세한 내용은 train( ) 함수 설명에서 다시 한 번 분석해보자.


# If a model is trained with multiple GPUs, prefix all Op names with tower_name
# to differentiate the operations. Note that this prefix is removed from the
# names of the summaries when visualizing a model.
# 멀티 GPU를 사용하여 병렬 처리할 때 작업 이름을 구분하기 위한 구분자...언제 써볼 수 있을까...ㅠ.ㅠ
TOWER_NAME = 'tower'

# CIFAR-10의 데이터 경로
DATA_URL = 'https://www.cs.toronto.edu/~kriz/cifar-10-binary.tar.gz'


이제부터는 함수를 하나 하나 살펴보도록 하겠다.


_activation_summary(x)

# 이 함수는 전체적으로 각 레이어들을 텐서 보드에 시각화하기 위한 summary를 만드는 작업을 한다.
"""Helper to create summaries for activations.
  Creates a summary that provides a histogram of activations.
  Creates a summary that measures the sparsity of activations.
  Args:
    x: Tensor
  Returns:
    nothing
  """
  # Remove 'tower_[0-9]/' from the name in case this is a multi-GPU training
  # session. This helps the clarity of presentation on tensorboard.
  tensor_name = re.sub('%s_[0-9]*/' % TOWER_NAME, '', x.op.name)
  tf.summary.histogram(tensor_name + '/activations', x)
  tf.summary.scalar(tensor_name + '/sparsity',
                                       tf.nn.zero_fraction(x))


_variable_on_cpu(name, shape, initializer)

# 파라미터로 전달받은 값을 이용하여 CPU를 이용하여 처리할 변수를 생성
"""Helper to create a Variable stored on CPU memory.
  Args:
    name: name of the variable
    shape: list of ints
    initializer: initializer for Variable
  Returns:
    Variable Tensor
  """
# 0번째 CPU를 사용하겠다고 지정
  with tf.device('/cpu:0'):
# python의 3항 연산 FLAGS.use_fp16이 true이면 tf.float16을 사용하고 false이면 
# else 뒤의 tf.float32를 사용. 이 코드에서는 FLAGS.use_fp16를 false로 설정했으므로
# tf.float32를 사용하게 됨
    dtype = tf.float16 if FLAGS.use_fp16 else tf.float32
# 파라미터로 전달된 변수가 이미 존재하면 재활용하고 존재하지 않으면 새로 만든다.
# 참고로 tf.Variable는 무조건 새로운 변수를 만든다. 자세한 사용법은 아래 링크 참조
# https://tensorflowkorea.gitbooks.io/tensorflow-kr/content/g3doc/how_tos/variable_scope/
    var = tf.get_variable(name, shape, initializer=initializer, dtype=dtype)
  return var


_variable_with_weight_decay(name, shape, stddev, wd)

# 위의 _variable_on_cpu(name, shape, initializer) 함수를 이용하여 정규화 처리를 한 변수를 생성.
"""Helper to create an initialized Variable with weight decay.
  Note that the Variable is initialized with a truncated normal distribution.
  A weight decay is added only if one is specified.
  Args:
    name: name of the variable
    shape: list of ints
    stddev: standard deviation of a truncated Gaussian
    wd: add L2Loss weight decay multiplied by this float. If None, weight
        decay is not added for this Variable.
  Returns:
    Variable Tensor
  """
# 데이터 타입 설정
# 세 번째 파라미터는 초기화 함수를 리턴하여 넘기는 것으로 truncated_normal_initializer는
# 정규분포 기반의 초기화 함수로 표준편차의 양 끝단을 잘라낸 값으로 새로운 정규분포를 만들어 
# 초기화 한다.
  dtype = tf.float16 if FLAGS.use_fp16 else tf.float32
  var = _variable_on_cpu(
      name,
      shape,
      tf.truncated_normal_initializer(stddev=stddev, dtype=dtype))

# L2 정규화 처리를 위한 코드. wd(아마도 Weight Decay)값이 None이 아닌 경우 if문
# 안의 코드를 수행하여 정규화 처리를 하고 그래프에 추가한다.
# tf.nn.l2_loss는 전달받은 텐서의 요소들의 제곱의 합을 2로 나누어 리턴한다.
  if wd is not None:
    weight_decay = tf.multiply(tf.nn.l2_loss(var), wd, name='weight_loss')
    tf.add_to_collection('losses', weight_decay)
  return var


위 함수들은 실제로 학습을 진행하면서 결과 값을 예측하는 과정에 사용되는 함수들이다.
자세한 내용들은 올바른 예측을 하기 위한 알고리즘을 구성하는 수학적인 내용이 포함되어있어
당장에는 이해가 쉽지 않다. 예를 들어 tf.truncated_normal_initializer의 경우 정규분포
그래프에서 2개 이상의 표준편차를 제거한 값들로 새롭게 만들어진 그래프로 초기화 한다고 해석이
되는데 사실 내용자체도 이해가 되지 않고 더 심각한 것은 수학적 개념이 포함된 영어를 해석하자니
제대로 해석이 되었는지도 모르겠다…ㅠ.ㅠ 일단은 학습을 최적화 시키고자 하는 목적으로 이러한
장치들을 사용한다는 것만 알아두면 되겠다.


distorted_inputs()

# cifar10_input.py에 있는 같은 이름의 함수를 이용하여 학습할 데이터를 불러온다.
"""Construct distorted input for CIFAR training using the Reader ops.
  Returns:
    images: Images. 4D tensor of [batch_size, IMAGE_SIZE, IMAGE_SIZE, 3] size.
    labels: Labels. 1D tensor of [batch_size] size.
  Raises:
    ValueError: If no data_dir
  """

# 데이터 경로가 지정되어있지 않으면 에러~
  if not FLAGS.data_dir:
    raise ValueError('Please supply a data_dir')

# 데이터 경로를 조합하여 최종적으로 사용할 이미지와 라벨을 가져옴
  data_dir = os.path.join(FLAGS.data_dir, 'cifar-10-batches-bin')
  images, labels = cifar10_input.distorted_inputs(data_dir=data_dir,
                                                  batch_size=FLAGS.batch_size)

# FLAGS.use_fp16 값이 true이면 이미지와 라벨 텐서의 요소들을 tf.float16 타입으로 형변환 한다.
# 하지만 코드에는 False로 지정되어있으므로 무시.
  if FLAGS.use_fp16:
    images = tf.cast(images, tf.float16)
    labels = tf.cast(labels, tf.float16)
  return images, labels


inputs(eval_data)

# 역시 cifar10_input.py에 있는 같은 이름의 함수를 이용하여 평가할 데이터를 불러온다.
# eval_data라는 파라미터가 추가된 것 외에는 distorted_inputs 함수와 내용 동일
"""Construct input for CIFAR evaluation using the Reader ops.
  Args:
    eval_data: bool, indicating if one should use the train or eval data set.
  Returns:
    images: Images. 4D tensor of [batch_size, IMAGE_SIZE, IMAGE_SIZE, 3] size.
    labels: Labels. 1D tensor of [batch_size] size.
  Raises:
    ValueError: If no data_dir
  """
  if not FLAGS.data_dir:
    raise ValueError('Please supply a data_dir')
  data_dir = os.path.join(FLAGS.data_dir, 'cifar-10-batches-bin')
  images, labels = cifar10_input.inputs(eval_data=eval_data,
                                        data_dir=data_dir,
                                        batch_size=FLAGS.batch_size)
  if FLAGS.use_fp16:
    images = tf.cast(images, tf.float16)
    labels = tf.cast(labels, tf.float16)
  return images, labels


inference(images)

# 이 소스의 핵심으로 예측을 위한 모델을 구성하는 함수
"""Build the CIFAR-10 model.
  Args:
    images: Images returned from distorted_inputs() or inputs().
  Returns:
    Logits.
  """
  # We instantiate all variables using tf.get_variable() instead of
  # tf.Variable() in order to share variables across multiple GPU training runs.
  # If we only ran this model on a single GPU, we could simplify this function
  # by replacing all instances of tf.get_variable() with tf.Variable().
  #
  # conv1
# convolution 레이어 1
  with tf.variable_scope('conv1') as scope:
# 커널(필터) 초기화 : 5 X 5 크기의 3채널 필터를 만들며 64개의 커널을 사용한다.
    kernel = _variable_with_weight_decay('weights',
                                         shape=[5, 5, 3, 64],
                                         stddev=5e-2,
                                         wd=None)
    conv = tf.nn.conv2d(images, kernel, [1, 1, 1, 1], padding='SAME')


이 부분은 CNN의 핵심이며 가장 중요한 부분이므로 좀 더 상세하게 알아보자.
일단 필터(커널보다 친숙하므로 앞으로는 ‘필터’로만 표기하겠다. 또한 원칙적으로는 bias까지 +되어야 완성된
필터라 할 수 있으나 우선은 bias를 무시하고 생각해보자)가 하는 역할이 무엇인지부터 알아보면 말 그대로 
이미지에서 지정된 영역의 특징만을 ‘걸러내는’ 역할을 한다. 


그러면 어떤 방식으로 특징을 걸러내는가?
바로 머신러닝이나 딥러닝을 처음 배울때 배웠던 xW + b의 함수를 사용해서 처리한다. 일단 bias는 무시하기로
했으니 xW만 생각해본다면 입력받은 이미지에서 필터와 겹치는 부분을 x라 보고 해당 위치의 필터를 W라 보아
x1* W1 + x2 * W2 + … + xn * Wn이 되는 것이다. 만약 3 X 3 필터를 사용하였다면 아래와 같이 계산할 수
있다.


x1 * W1 + x2 * W2 + x3 * W3 + ... x9 * W9


여기에 만일 입력 채널(이미지의 색상 채널)이 3이라면 각 채널마다 위의 계산을 적용한 후 각 채널별 출력값을
최종 더해서 하나의 feature map을 만들게 된다. 결국 하나의 필터가 하나의 feature map을 만들게 되므로
만일 필터를 여러개 사용한다면 feature map의 개수도 필터의 개수와 동일하게 만들어지고 이 수가 곧 
feature map의 채널이 된다(그리고 이 각각의 채널에 bias를 +하게 된다). 


이 내용을 이해 못해 수없이 구글링을 했으나 적절한 자료를 찾지 못했는데 아래 이미지를 보고 쉽게 이해할 수 있었다.


CNN Filter feature map

이미지 출처 : http://taewan.kim/post/cnn/


이 코드를 가지고 계산을 해보면 24 X 24 크기의 3채널 이미지를 입력으로 받아 5 X 5 크기의 3채널 필터 64개를
사용하고 padding이 원본 크기와 동일한 출력이 나오도록 SAME으로 지정되었으므로 24 X 24 크기에 64 채널을
가진 출력이 나올 것이다. 여기에 배치 사이즈가 128이므로 최종 출력 텐서의 shape는 [128, 24, 24, 64]가 된다.


# 바이어스 초기화
# 채널의 개수가 64개이므로 bias도 64개 설정. biases는 64개의 요소가 0.0으로 채워진
# vector
    biases = _variable_on_cpu('biases', [64], tf.constant_initializer(0.0))

# 가중치 + 바이어스. biases는 conv의 마지막 차수와 일치하는 1차원 텐서여야 한다.
    pre_activation = tf.nn.bias_add(conv, biases)

# 활성화 함수 추가
    conv1 = tf.nn.relu(pre_activation, name=scope.name)

# 텐서 보드에서 확인하기 위한 호출
    _activation_summary(conv1)

  # pool1
# 풀링 레이어 1
# pooling은 간단하게 말해 이미지를 축소하는 단계로 필터로 주어진 영역 내에서 특정한 값(평균,최대,최소)을
뽑아내는 작업이다. 일단 최대값을 뽑는 것이 가장 성능이 좋다고 하여 max pooling을 주로 사용한단다.
# 이 코드에서는 필터 크기가 3 X 3이므로 이 영역에서 가장 큰 값만을 뽑아 사용한다. stride는 2를 사용한다.
  pool1 = tf.nn.max_pool(conv1, ksize=[1, 3, 3, 1], strides=[1, 2, 2, 1],
                         padding='SAME', name='pool1')
  # norm1
# local response normalization라는 정규화 처리인데 ReLu 사용시 에러율 개선에 
# 효과가 있다고 하는데 이 부분은 좀 더 확인이 필요함
  norm1 = tf.nn.lrn(pool1, 4, bias=1.0, alpha=0.001 / 9.0, beta=0.75,
                    name='norm1')

  # conv2
# convolution 레이어 2
  with tf.variable_scope('conv2') as scope:
    kernel = _variable_with_weight_decay('weights',
                                         shape=[5, 5, 64, 64],
                                         stddev=5e-2,
                                         wd=None)
    conv = tf.nn.conv2d(norm1, kernel, [1, 1, 1, 1], padding='SAME')
    biases = _variable_on_cpu('biases', [64], tf.constant_initializer(0.1))
    pre_activation = tf.nn.bias_add(conv, biases)
    conv2 = tf.nn.relu(pre_activation, name=scope.name)
    _activation_summary(conv2)

  # norm2
# local response normalization 2
  norm2 = tf.nn.lrn(conv2, 4, bias=1.0, alpha=0.001 / 9.0, beta=0.75,
                    name='norm2')
  # pool2
# 풀링 레이어 2
  pool2 = tf.nn.max_pool(norm2, ksize=[1, 3, 3, 1],
                         strides=[1, 2, 2, 1], padding='SAME', name='pool2')

  # local3
# fully connected layer 
  with tf.variable_scope('local3') as scope:
    # Move everything into depth so we can perform a single matrix multiply.
    reshape = tf.reshape(pool2, [FLAGS.batch_size, -1])
    dim = reshape.get_shape()[1].value
    weights = _variable_with_weight_decay('weights', shape=[dim, 384],
                                          stddev=0.04, wd=0.004)
    biases = _variable_on_cpu('biases', [384], tf.constant_initializer(0.1))
    local3 = tf.nn.relu(tf.matmul(reshape, weights) + biases, name=scope.name)
    _activation_summary(local3)

  # local4
# fully connected layer 2
  with tf.variable_scope('local4') as scope:
    weights = _variable_with_weight_decay('weights', shape=[384, 192],
                                          stddev=0.04, wd=0.004)
    biases = _variable_on_cpu('biases', [192], tf.constant_initializer(0.1))
    local4 = tf.nn.relu(tf.matmul(local3, weights) + biases, name=scope.name)
    _activation_summary(local4)

  # linear layer(WX + b),
  # We don't apply softmax here because
  # tf.nn.sparse_softmax_cross_entropy_with_logits accepts the unscaled logits
  # and performs the softmax internally for efficiency.
# softmax layer
  with tf.variable_scope('softmax_linear') as scope:
    weights = _variable_with_weight_decay('weights', [192, NUM_CLASSES],
                                          stddev=1/192.0, wd=None)
    biases = _variable_on_cpu('biases', [NUM_CLASSES],
                              tf.constant_initializer(0.0))
    softmax_linear = tf.add(tf.matmul(local4, weights), biases, name=scope.name)
    _activation_summary(softmax_linear)

  return softmax_linear


이 함수의 코드는 Convolutional layer > ReLu layer > Pooling Layer > Norm layer > Convolutional layer 
> ReLu layer > Norm layer > Pooling layer > Fully connected layer > Fully connected layer > 
Softmax layer의 순으로 구성이 되어있는데 이 중 Norm layer가 정확히 어떤 역할을 하는지는 아직 잘 모르겠다.
일단 ReLu를 보조하는 것 같은데 더 알아봐야겠다.


loss(logits, labels)

# 손실 값 계산을 위한 함수
# 아래 주석에서 보이듯 logits 파라미터는 inference() 함수의 리턴 값이고 labels는 distorted_input()
# 또는 input() 함수의 리턴 튜플 중 labels 부분이다. cross entropy를 이용하여 loss를 구한다.
"""Add L2Loss to all the trainable variables.
  Add summary for "Loss" and "Loss/avg".
  Args:
    logits: Logits from inference().
    labels: Labels from distorted_inputs or inputs(). 1-D tensor
            of shape [batch_size]
  Returns:
    Loss tensor of type float.
  """
  # Calculate the average cross entropy loss across the batch.
# 여기서는 sparse_softmax_cross_entropy_with_logits 함수가 사용되고 있는데
# softmax_cross_entropy_with_logits와의 차이라면 softmax_cross_entropy_with_logits
# 함수가 확률분포를를 따른다면 sparse_softmax_cross_entropy_with_logits는 독점적인 확률로
# label이 주어진다고 하는데...무슨 의미인지 잘 모르겠다...ㅠ.ㅠ 확인이 필요한 내용
  labels = tf.cast(labels, tf.int64)
  cross_entropy = tf.nn.sparse_softmax_cross_entropy_with_logits(
      labels=labels, logits=logits, name='cross_entropy_per_example')
  cross_entropy_mean = tf.reduce_mean(cross_entropy, name='cross_entropy')
  tf.add_to_collection('losses', cross_entropy_mean)

  # The total loss is defined as the cross entropy loss plus all of the weight
  # decay terms (L2 loss).
  return tf.add_n(tf.get_collection('losses'), name='total_loss')


_add_loss_summaries(total_loss)

# 텐서 보드에 손실값 표시를 위해 손실 값에 대한 summary 추가하고
# 손실값들의 이동 평균을 구하여 리턴. 여기서 사용하는 이동 평균은 가장 최근 값에 가중치를 두는
# tf.train.ExponentialMovingAverage을 사용하여 구한다.
"""Add summaries for losses in CIFAR-10 model.
  Generates moving average for all losses and associated summaries for
  visualizing the performance of the network.
  Args:
    total_loss: Total loss from loss().
  Returns:
    loss_averages_op: op for generating moving averages of losses.
  """
  # Compute the moving average of all individual losses and the total loss.
  loss_averages = tf.train.ExponentialMovingAverage(0.9, name='avg')
  losses = tf.get_collection('losses')
  loss_averages_op = loss_averages.apply(losses + [total_loss])

  # Attach a scalar summary to all individual losses and the total loss; do the
  # same for the averaged version of the losses.
  for l in losses + [total_loss]:
    # Name each loss as '(raw)' and name the moving average version of the loss
    # as the original loss name.
    tf.summary.scalar(l.op.name + ' (raw)', l)
    tf.summary.scalar(l.op.name, loss_averages.average(l))

  return loss_averages_op


train(total_loss, global_step)

# 학습을 실행시키는 함수
"""Train CIFAR-10 model.
  Create an optimizer and apply to all trainable variables. Add moving
  average for all trainable variables.
  Args:
    total_loss: Total loss from loss().
    global_step: Integer Variable counting the number of training steps
      processed.
  Returns:
    train_op: op for training.
  """
  # Variables that affect learning rate.
# 미리 정의한 변수들을 이용하여 러닝 rate를 조정할 파라미터를 결정한다. 
  num_batches_per_epoch = NUM_EXAMPLES_PER_EPOCH_FOR_TRAIN / FLAGS.batch_size
  decay_steps = int(num_batches_per_epoch * NUM_EPOCHS_PER_DECAY)

  # Decay the learning rate exponentially based on the number of steps.
# 학습 step이 증가할 수록 러닝 rate를 기하급수적으로 감소시키도록 처리한다.
# tf.train.exponential_decay 함수는 아래 식의 결과를 리턴한다.
# INITIAL_LEARNING_RATE * LEARNING_RATE_DECAY_FACTOR ^ (global_step / decay_steps)
  lr = tf.train.exponential_decay(INITIAL_LEARNING_RATE,
                                  global_step,
                                  decay_steps,
                                  LEARNING_RATE_DECAY_FACTOR,
                                  staircase=True)
  tf.summary.scalar('learning_rate', lr)

  # Generate moving averages of all losses and associated summaries.
  loss_averages_op = _add_loss_summaries(total_loss)

# Optimizer 설정 및 텐서 보드에 표시하기 위한 summary 생성 후 추가
  # Compute gradients.
  with tf.control_dependencies([loss_averages_op]):
    opt = tf.train.GradientDescentOptimizer(lr)
    grads = opt.compute_gradients(total_loss)

  # Apply gradients.
  apply_gradient_op = opt.apply_gradients(grads, global_step=global_step)

  # Add histograms for trainable variables.
  for var in tf.trainable_variables():
    tf.summary.histogram(var.op.name, var)

  # Add histograms for gradients.
  for grad, var in grads:
    if grad is not None:
      tf.summary.histogram(var.op.name + '/gradients', grad)

  # Track the moving averages of all trainable variables.
  variable_averages = tf.train.ExponentialMovingAverage(
      MOVING_AVERAGE_DECAY, global_step)
  variables_averages_op = variable_averages.apply(tf.trainable_variables())

# tf.control_dependencies 함수는 오퍼레이션간의 의존성을 지정하는 함수로 with와 함께
# 사용하면 파라미터로 전달된 오퍼레이션이 우선 수행된 뒤 다음 문장, 여기서는 with문 아래 있는
# train_op = tf.no_op(name='train')이 수행된다. 
  with tf.control_dependencies([apply_gradient_op, variables_averages_op]):
    train_op = tf.no_op(name='train')

# 이미 알다시피 여기까지는 그저 그래프를 만든 것 뿐, 이제 tf.Session을 통해 run을 하면
# 이전까지 구성된 그래프가 실행된다. 실제로 실행시키는 내용은 cifar10_tranin.py에 들어있다. 
  return train_op


maybe_download_and_extract()

# 웹사이트로부터 CIFAR-10 데이터 셋을 다운로드 받아 사용할 경로에 압축을 풀게 하는 함수
# 이미 별도로 데이터 셋을 받아놓은 경우는 필요 없음
"""Download and extract the tarball from Alex's website."""
  dest_directory = FLAGS.data_dir
  if not os.path.exists(dest_directory):
    os.makedirs(dest_directory)
  filename = DATA_URL.split('/')[-1]
  filepath = os.path.join(dest_directory, filename)
  if not os.path.exists(filepath):
    def _progress(count, block_size, total_size):
      sys.stdout.write('\r>> Downloading %s %.1f%%' % (filename,
          float(count * block_size) / float(total_size) * 100.0))
      sys.stdout.flush()
    filepath, _ = urllib.request.urlretrieve(DATA_URL, filepath, _progress)
    print()
    statinfo = os.stat(filepath)
    print('Successfully downloaded', filename, statinfo.st_size, 'bytes.')
  extracted_dir_path = os.path.join(dest_directory, 'cifar-10-batches-bin')
  if not os.path.exists(extracted_dir_path):
    tarfile.open(filepath, 'r:gz').extractall(dest_directory)


정리


핵심적인 내용들이 대부분 들어있는 소스이다보니 잊어버린 내용 되찾으랴 또 생소한 API 확인하랴 시간이
많이 걸렸다.


단지 시간만 많이 걸린 것이면 그나마 다행이지만 꽤 많은 부분을 이해하지 못한다는 것은 참으로 난감한 일이
아닐 수 없다…ㅠ.ㅠ 그래도 기본적인 CNN의 흐름을 따라 어찌어찌 정리는 했지만 여전히 확인해야 할 내용들이
많이 남아있다. 특히나 API의 경우 기본적으로 파라미터나 리턴 값들이 텐서를 기반으로 하고 있는데다가 설명
또한 수학적인 내용이나 용어들을 포함하고 있다보니 java나 python 같은 프로그래밍 언어의 API 문서를
대하는 것과는 그 이해의 차원이 다르다.


일단 중요한 고비는 넘겼으니 다음 포스팅에서 학습을 진행하기 위한 메인 소스인 cifar10_train.py를
살펴보고 그 다음 마지막으로 cifar10_eval.py를 살펴본 후 이 소스 코드에 등장했던 API들을 모두
차근차근 번역해봐야겠다.

블로그 이미지

마즈다

이미 마흔을 넘어섰지만 아직도 꿈을 좇고 있습니다. 그래서 그 꿈에 다가가기 위한 단편들을 하나 둘 씩 모아가고 있지요. 이 곳에 그 단편들이 모일 겁니다...^^





목차

1. 소스코드로 텐서플로우 맛보기 : [CNN] CIFAR-10 ~ cifar10_input.py (이번 글)





소스코드로 텐서플로우 맛보기 : [CNN] CIFAR-10


나름 직장 동료들과 열심히 공부를 하고 있고 또 이 딥러닝이라는 분야의 공부를 시작한지도 어언 1년이 다되간다.
하지만 한 때 유행했던 유머처럼 ‘딥러닝을 글로만 배웠어요~’인 상태이다보니 제대로 뭔가를 알고 있는 것인지
감조차 오지 않았다. 그래서 이제야 비로소 예제 코드를 돌려보기로 했다. 


다만 그저 샘플 소스를 다운로드 받고 실행하고 끝! 하는 것이 아닌 적어도 소스 코드가 어떤 의미인지는 알고
돌려보기로 했다. 그 시작으로 CNN쪽에 있는 CIFAR-10 예제를 대상으로 삼았다.


처음에는 함께 공부하는 직장 동료들과 직독직해 식으로 소스를 분석해보려고 했으나…
이런 상황을 ‘자만심 오졌다리~’라고 표현해야 하나…처음 import부터 막혀서 쩔쩔매다가 일단 내가
분석을 좀 하고 내용을 공유하기로 한 것이다.


이러한 형편이니 혹시라도 잘못된 내용이 있으면 따끔한 충고 부탁드린다…^^;;


cifar10_input.py


# sys.path 상의 가장 상위 모듈을 import 하는 것을 보장해 줌. 
from __future__ import absolute_import
# /연산자와 더불어 // 연산자 사용 가능, / 연산자는 실수형을 리턴, // 연산자는 몫 부분만 정수로 리턴
from __future__ import division
# print 함수에 ()를 사용할 수 있게 함
from __future__ import print_function


__future __의 의미 : Python 2에서 Python 3 함수를 사용할 수 있게 해줌
위의 3줄은 Python 2와 Python 3의 호환성을 위한 import이다.


# OS의 자원을 사용할 수있게 해주는 모듈
import os

# six(2 * 3)는 Python 2와 Python 3에서 차이나는 함수들을 함께 사용할 수 있게 해줌
# xrange는 3에서는 range
from six.moves import xrange  # pylint: disable=redefined-builtin
# 아기다리고기다리던 텐서플로우
import tensorflow as tf


데이터를 읽어들이기 위해 OS 자원을 사용하도록 해주고 range의 하위호환성을 위해 xrange를 import 했으며
마지막으로 텐서플로우를 import 함


IMAGE_SIZE = 24


32 X 32 사이즈의 이미지를 랜덤하게 24 X 24 사이즈로 Corp함으로써 전체 데이터셋의 크기가 커진다.


NUM_CLASSES = 10
NUM_EXAMPLES_PER_EPOCH_FOR_TRAIN = 50000
NUM_EXAMPLES_PER_EPOCH_FOR_EVAL = 10000


CIFAR-10 데이터 셋 관련 상수로 총 10개의 클래스(비행기, 자동차, 새, 고양이, 사슴, 개, 개구리, 말, 배, 트럭)가
있으며 학습을 위한 데이터 50000건 테스트를 위한 데이터 10000건으로 구성된다.


이 파일에는 총 4개의 함수가 있으며 각각 다음과 같다.

  • read_cifar10(filename_queue) : 파일 이름 목록을 받아와 CIFAR-10의 바이너리 데이터를 읽고 파싱하여 단일 오브젝트 형태로 반환한다. 이 오브젝트에는 height, width, depth, key, label, uint8image 등의 필드가 있다.
  • _generate_image_and_label_batch(image, label, min_queue_examples, batch_size, shuffle) : image와 label들을 담은 배치용 queue를 만들어 리턴한다.
  • distorted_inputs(data_dir, batch_size) : 데이터셋 확대를 위한 이미지 왜곡 작업을 진행한다.
    read_cifar10 함수를 호출하여 그 리턴 값을 가지고 작업한다. 학습 시 사용.
  • inputs(eval_data, data_dir, batch_size) : 평가를 위한 input에 사용하며 역시 read_cifar10
    함수를 호출하여 사용하며 Crop 외에 다른 조작은 하지 않는다. 


이미 코드에 영문 주석이 다 있지만 추가로 한글 주석을 추가하며 알아보자.


distorted_inputs(data_dir, batch_size)

def distorted_inputs(data_dir, batch_size):
  """Construct distorted input for CIFAR training using the Reader ops.
  Args:
    data_dir: Path to the CIFAR-10 data directory.
    batch_size: Number of images per batch.
  Returns:
    images: Images. 4D tensor of [batch_size, IMAGE_SIZE, IMAGE_SIZE, 3] size.
    labels: Labels. 1D tensor of [batch_size] size.
  """

# os.path.join 함수는 전달받은 파라미터를 이어 새로운 경로를 만드는 함수
# 아래 코드는 이 함수에서 파라미터로 받은 data_dir 경로와 그 경로 아래에 있는
# CIFAR-10의 이미지 파일이 담긴 data_batch_1.bin ~ data_batch_5.bin의
# 5개 파일에 대한 전체 경로를 요소로 하는 벡터(텐서)를 만드는 것이다.
  filenames = [os.path.join(data_dir, 'data_batch_%d.bin' % i)
               for i in xrange(1, 6)]

# 만일 배열 내에 파일 경로가 없으면 에러 발생
  for f in filenames:
    if not tf.gfile.Exists(f):
      raise ValueError('Failed to find file: ' + f)

# string_input_producer 함수는 필수 파라미터인 첫 번째 파라미터에 string 타입의 요소로 만들어진 
# 텐서 타입을 받아서 각 요소 문자열로 구성된 Queue 형태로 리턴을 해준다.
  # Create a queue that produces the filenames to read.
  filename_queue = tf.train.string_input_producer(filenames)

  with tf.name_scope('data_augmentation'):
    # Read examples from files in the filename queue.
# 아래 설명할 read_cifar10 함수로부터 라벨, 이미지 정보 등을 포함한 
# CIFAR10Record 클래스 타입을 톨려받는다.
    read_input = read_cifar10(filename_queue)

# cast 함수는 첫 번째 인자로 받은 텐서 타입의 파라미터를 두 번째 인자로 받은
# 데이터 타입의 요소를 가진 텐서로 돌려준다.
    reshaped_image = tf.cast(read_input.uint8image, tf.float32)

    height = IMAGE_SIZE
    width = IMAGE_SIZE

    # Image processing for training the network. Note the many random
    # distortions applied to the image.

    # Randomly crop a [height, width] section of the image.
# tf.random_crop 함수는 첫 번째 파라미터로 받은 텐서타입의 이미지들을 
# 두 번째 파라미터로 받은 크기로 무작위로 잘라 첫 번째 받은 파라미터와 같은 rank의
# 텐서 형태로 돌려준다. 
    distorted_image = tf.random_crop(reshaped_image, [height, width, 3])

    # Randomly flip the image horizontally.
# 좌우를 랜덤하게 뒤집은 형태의 텐서를 돌려준다.
    distorted_image = tf.image.random_flip_left_right(distorted_image)

    # Because these operations are not commutative, consider randomizing
    # the order their operation.
    # NOTE: since per_image_standardization zeros the mean and makes
    # the stddev unit, this likely has no effect see tensorflow#1458.
# 밝기와 콘트라스트를 랜텀하게 변형시킨 텐서를 돌려준다.
    distorted_image = tf.image.random_brightness(distorted_image,
                                                 max_delta=63)
    distorted_image = tf.image.random_contrast(distorted_image,
                                               lower=0.2, upper=1.8)
# random_crop부터 random_contrast까지는 데이터 셋 확장을 위해 이미지를 임의 조작하는
# 과정이다.

    # Subtract off the mean and divide by the variance of the pixels.
# 이미지를 표준화 하는 과정인 듯한데...어려워서 패쓰~
    float_image = tf.image.per_image_standardization(distorted_image)

    # Set the shapes of tensors.
# 텐서의 shape 설정
    float_image.set_shape([height, width, 3])
    read_input.label.set_shape([1])

    # Ensure that the random shuffling has good mixing properties.
# 전체 테스트용 이미지의 40%, 즉, 총 50000개의 테스트 이미지 중 20000개를 사용
    min_fraction_of_examples_in_queue = 0.4
    min_queue_examples = int(NUM_EXAMPLES_PER_EPOCH_FOR_TRAIN *
                             min_fraction_of_examples_in_queue)
    print ('Filling queue with %d CIFAR images before starting to train. '
           'This will take a few minutes.' % min_queue_examples)

  # Generate a batch of images and labels by building up a queue of examples.
# 배치 작업에 사용할 128개의 이미지를 shuffle하여 리턴함
  return _generate_image_and_label_batch(float_image, read_input.label,
                                         min_queue_examples, batch_size,
                                         shuffle=True)


read_cifar10(filename_queue)

"""Reads and parses examples from CIFAR10 data files.
  Recommendation: if you want N-way read parallelism, call this function
  N times.  This will give you N independent Readers reading different
  files & positions within those files, which will give better mixing of
  examples.
  Args:
    filename_queue: A queue of strings with the filenames to read from.
  Returns:
    An object representing a single example, with the following fields:
      height: number of rows in the result (32)
      width: number of columns in the result (32)
      depth: number of color channels in the result (3)
      key: a scalar string Tensor describing the filename & record number
        for this example.
      label: an int32 Tensor with the label in the range 0..9.
      uint8image: a [height, width, depth] uint8 Tensor with the image data
  """

# 이 함수의 리턴 값은 CIFAR10Record라는 class임 pass는 비어있는 클래스 선언 시 사용
# 이미 아는 바와 같이 텐서플로우의 Session.run이 실행되기 전까지는 비어있는 클래스이며
# Session.run이 실행된 이후에야 데이터 파일의 레코드들이 클래스에 들어가게 된다.
  class CIFAR10Record(object):
    pass
  result = CIFAR10Record()

# label_bytes는 말 그대로 라벨의 길이이고 1byte이다.
# result.height는 이미지의 높이
# result.width는 이미지의 넓이
# result.depth는 이미지를 구성하는 색상 채널
# image_bytes 결국 이미지를 구성하는 총 byte 수는 높이 * 넓이 * 색상 채널

  # Dimensions of the images in the CIFAR-10 dataset.
  # See http://www.cs.toronto.edu/~kriz/cifar.html for a description of the
  # input format.
  label_bytes = 1  # 2 for CIFAR-100
  result.height = 32
  result.width = 32
  result.depth = 3
  image_bytes = result.height * result.width * result.depth

# 모든 레코드는 라벨과 라벨에 해당하는 이미지로 구성되어있으므로  
# 전체 레코드 크기는 label_bytes + image_bytes로 고정
  # Every record consists of a label followed by the image, with a
  # fixed number of bytes for each.
  record_bytes = label_bytes + image_bytes

# tf.FixedLengthRecordReader는 파일로부터 고정길이의 레코드를 출력해주는 클래스
# 생성 시 첫 번째 파라미터는 읽어올 레코드의 바이트 수
  # Read a record, getting filenames from the filename_queue.  No
  # header or footer in the CIFAR-10 format, so we leave header_bytes
  # and footer_bytes at their default of 0.
  reader = tf.FixedLengthRecordReader(record_bytes=record_bytes)

# Queue 타입(FIFO)의 자료 구조를 파라미터로 받아 그 안의 레코드로부터
# Key와 Value를 받아오는 처리. key는 레코드가 포함된 파일명과 index의 구성으로
# 되어있으며, value는 사용할 라벨과 이미지가 포함된 텐서임.
  result.key, value = reader.read(filename_queue)

  # Convert from a string to a vector of uint8 that is record_bytes long.
# byte 타입의 문자열을 숫자형 벡터로 변환. 첫 번째 인자는 문자열로 구성된 텐서이며
# 모든 요소들은 동일한 길이여야 함. 두 번째 인자는 변환할 데이터 타입
  record_bytes = tf.decode_raw(value, tf.uint8)

  # The first bytes represent the label, which we convert from uint8->int32.
# 첫 번째 인자로 받은 텐서를 두 번째 인자로 받은 데이터 타입으로 형변환 함.
# 즉, 아래 코드는 위에서 구성된 record_bytes에서 첫 번째 바이트를 가져와 int32
# 타입으로 변환하여 리턴한다. 따라서 result.label은 1바이트 크기의 int32 타입 요소를
# 갖는 벡터이다.
  result.label = tf.cast(
      tf.strided_slice(record_bytes, [0], [label_bytes]), tf.int32)

  # The remaining bytes after the label represent the image, which we reshape
  # from [depth * height * width] to [depth, height, width].
# tf.reshape는 첫 번째 파라미터의 shape를 두 번째 파라미터로 받은 형태로 바꾼다.
# 아래 코드의 첫 번째 인자는 record_bytes에서 첫 바이트인 라벨을 제외한 나머지
# 바이트(이미지 부분)를 가져와 [3, 32, 32] 형태의 shape로 바꾼다. 
  depth_major = tf.reshape(
      tf.strided_slice(record_bytes, [label_bytes],
                       [label_bytes + image_bytes]),
      [result.depth, result.height, result.width])
  # Convert from [depth, height, width] to [height, width, depth].
# tf.transpose는 첫 번째 파라미터로 받은 텐서의 각 차원 값을 두 번째 파라미터로 전달받은
# 순서로 바꾼 텐서를 리턴한다. 위의 depth_major의 shape는 [3, 32, 32]이다.
# 즉, shape의 0번째 요소는 3, 1번째 요소는 32, 2번째 요소는 32이다. 이 것을 두 번째
# 파라미터처럼 인덱스를 [1, 2, 0]로 바꾸는 것이므로 1 번째 요소인 32가 맨 앞으로, 다음으로
# 2 번째 요소인 32가 오고 0번째 요소인 3은 맨 마지막으로 가게 되는 것이다.
# 결국 최초에 [depth, height, width]의 순서가 [height, width, depth]가 된다.
  result.uint8image = tf.transpose(depth_major, [1, 2, 0])

# 테스트 코드 시작 ##############################################
# 원본 코드에는 없는 내용이지만 아래 코드를 이용하여 간단하게 데이터를 정상적으로 불러왔는지
# 확인할 수 있다. 아래 코드를 싫행하면 총 100개의 이미지가 10 X 10 형태로 배열된 1개의 이미지가
# 만들어지며, label, key, value 값을 확인할 수 있다.
# 이 코드를 사용하려면 matplotlib.pyplot을 import해야 한다.
 fig, ax = plt.subplots(10, 10, figsize=(10, 10))
  with tf.Session() as sess:
      coord = tf.train.Coordinator()
      threads = tf.train.start_queue_runners(coord=coord, sess=sess)

      for i in range(10):
          for j in range(10):
              print(sess.run(result.label), sess.run(result.key), sess.run(value))
              img = sess.run(result.uint8image)
              ax[i][j].set_axis_off()
              ax[i][j].imshow(img)

      dir = os.path.abspath("cifar10_image")
      plt.savefig(dir + "/" + "image")
      print(dir)

      coord.request_stop()
      coord.join(threads)
# 테스트 코드 끝 ############################################

  return result


_generate_image_and_label_batch(image, label, min_queue_examples, batch_size, shuffle)

"""Construct a queued batch of images and labels.
  Args:
    image: 3-D Tensor of [height, width, 3] of type.float32.
    label: 1-D Tensor of type.int32
    min_queue_examples: int32, minimum number of samples to retain
      in the queue that provides of batches of examples.
    batch_size: Number of images per batch.
    shuffle: boolean indicating whether to use a shuffling queue.
  Returns:
    images: Images. 4D tensor of [batch_size, height, width, 3] size.
    labels: Labels. 1D tensor of [batch_size] size.
  """
  # Create a queue that shuffles the examples, and then
  # read 'batch_size' images + labels from the example queue.
# 각각 배치를 생성하는 코드로 shuffle_batch는 무작위로 뒤섞은 배치를 생성하며
# batch는 입력 텐서와 레코드 순서가 동일한 배치를 생성한다. 배치 생성 시 16개의
# thread를 사용한다.
  num_preprocess_threads = 16
  if shuffle:
    images, label_batch = tf.train.shuffle_batch(
        [image, label],
        batch_size=batch_size,
        num_threads=num_preprocess_threads,
        capacity=min_queue_examples + 3 * batch_size,
        min_after_dequeue=min_queue_examples)
  else:
    images, label_batch = tf.train.batch(
        [image, label],
        batch_size=batch_size,
        num_threads=num_preprocess_threads,
        capacity=min_queue_examples + 3 * batch_size)

  # Display the training images in the visualizer.
# 텐서보드에서 이미지를 보여주긴 위한 코드
  tf.summary.image('images', images)

# 배치 과정을 거친 이미지와 라벨의 최종 shape는 각각 [128, 32, 32, 3]과 [128]이다.
  return images, tf.reshape(label_batch, [batch_size])


inputs(eval_data, data_dir, batch_size)

"""Construct input for CIFAR evaluation using the Reader ops.
  Args:
    eval_data: bool, indicating if one should use the train or eval data set.
    data_dir: Path to the CIFAR-10 data directory.
    batch_size: Number of images per batch.
  Returns:
    images: Images. 4D tensor of [batch_size, IMAGE_SIZE, IMAGE_SIZE, 3] size.
    labels: Labels. 1D tensor of [batch_size] size.
  """

# eval_data 값에 따라 학습용 데이터를 불러올지 평가용 데이터를 불러올지 결정한다.
  if not eval_data:
    filenames = [os.path.join(data_dir, 'data_batch_%d.bin' % i)
                 for i in xrange(1, 6)]
    num_examples_per_epoch = NUM_EXAMPLES_PER_EPOCH_FOR_TRAIN
  else:
    filenames = [os.path.join(data_dir, 'test_batch.bin')]
    num_examples_per_epoch = NUM_EXAMPLES_PER_EPOCH_FOR_EVAL

# 이후 코드는 이미지 변형 (random_flip_left_right, random_brightness,
# random_contrast) 처리를 제외하고는 distorted_inputs(data_dir, batch_size)
# 함수와 동일하다.
  for f in filenames:
    if not tf.gfile.Exists(f):
      raise ValueError('Failed to find file: ' + f)

  with tf.name_scope('input'):
    # Create a queue that produces the filenames to read.
    filename_queue = tf.train.string_input_producer(filenames)

    # Read examples from files in the filename queue.
    read_input = read_cifar10(filename_queue)
    reshaped_image = tf.cast(read_input.uint8image, tf.float32)

    height = IMAGE_SIZE
    width = IMAGE_SIZE

    # Image processing for evaluation.
    # Crop the central [height, width] of the image.
    resized_image = tf.image.resize_image_with_crop_or_pad(reshaped_image,
                                                           height, width)

    # Subtract off the mean and divide by the variance of the pixels.
    float_image = tf.image.per_image_standardization(resized_image)

    # Set the shapes of tensors.
    float_image.set_shape([height, width, 3])
    read_input.label.set_shape([1])

    # Ensure that the random shuffling has good mixing properties.
    min_fraction_of_examples_in_queue = 0.4
    min_queue_examples = int(num_examples_per_epoch *
                             min_fraction_of_examples_in_queue)

  # Generate a batch of images and labels by building up a queue of examples.
  return _generate_image_and_label_batch(float_image, read_input.label,
                                         min_queue_examples, batch_size,
                                         shuffle=False)


정리


늘 어처구니 없는 실수가 따라다닌다.
CIFAR-10 홈페이지에 가면 다음과 같이 데이터 셋이 3가지 버전이 있다.

  • CIFAR-10 python version
  • CIFAR-10 Matlab version
  • CIFAR-10 binary version (suitable for C programs)


나는 Tensorflow가 python 기반으로 코딩이 되므로 당연히 python versiond을 받아야 한다고 생각했다.
그런데 python 버전을 사용하여 코드를 실행하다보니 뭔가 이상했다. 간간히 데이터를 제대로 불러왔는지
확인하기 위한 print문에 이상한 결과가 찍히는 것이다. CIFAR10Record 클래스의 멤버들에 대한 shape나
rank는 물론 중간에 시험삼아 100개의 이미지를 출력한 것도 모든 이미지가 깨져서 나왔다.



주말 2일을 고민하다가 문득 원래의 코드에는 파일명을 가져올 때 .bin이라는 확장자가 있었는데 내가 사용하는
데이터 파일에는 확장자가 없는 것을 발견했다. 그리고 겨우 내가 잘못된 버전의 데이터 셋을 받았다는 것을 
깨달았다…ㅠ.ㅠ


새로 받은 버전의 데이터 셋은 아래와 같이 이미지가 정상적으로 나왔다.



이제 겨우 파일 하나 분석해봤을 뿐인데 벌써 지친다…특히나 텐서라는 개념과 행렬 연산 그리고 Tensorflow의
지연 실행이라는 메커니즘은 정말 적응이 안된다…ㅠ.ㅠ 다음 포스팅에서는 cifar10.py 파일을 분석해보자.

블로그 이미지

마즈다

이미 마흔을 넘어섰지만 아직도 꿈을 좇고 있습니다. 그래서 그 꿈에 다가가기 위한 단편들을 하나 둘 씩 모아가고 있지요. 이 곳에 그 단편들이 모일 겁니다...^^


Data Analysis : Prologue - EMQ + Kafka + openTSDB + Grafana


사실 이 포스팅의 제목을 어떻게 정해야 할지 고민이 많았다. 일단 클러스터 설치는 마쳤으니 당연히 이전 포스팅의
카테고리에서는 벗어나야 할 것이고, 실제 데이터를 다루기 시작하였으니 데이터 분석의 시작은 시작인데…
수집되는 데이터 자체가 딱히 분석할만한 데이터는 아니고…암튼 조금은 모호한 지점에 있는 작업이 되어버렸다.
더군다나 본격적인 데이터 분석은 아직 한참 더 공부한 이후 시작될 터인데…


하지만 역시나 데이터를 다루기 시작했으니 데이터 관련 제목을 붙여야 하겠다고 결정을 했다. Prologue라는 부제와
함께. 이후 The Beginning을 시작하게 되겠지만 아마도 시간이 조금 걸리지 않을까 싶다. 오늘은 가볍게나마
이전에 클러스터 구성 내용을 정리할 때 누락된 openTSDB와 Grafana 설치에 대한 내용과 EMQ -> Kafka ->
openTSDB -> Grafana로 이어지는 데이터 수집/저장/시각화 과정을 정리해보도록 하겠다.


시나리오


직전 포스팅(아두이노 온습도계 제작)에서도 언급했듯이 좀 더 그럴싸한 데이터를 모아 분석을 해보고 싶었지만
막상 작업을 시작하고 보니 마땅히 모을 수 있는 데이터가 없었다. 그렇다고 데이터를 찾아 다닐수만은 없기에
우선 쉽게 접근할 수 있는 것으로부터 시작해보기로 했다. 시나리오는 다음과 같다.


  1. 아두이노로 제작된 온습도 및 먼지 센서를 통해 온도,습도,먼지 농도,시간 데이터 수집
  2. MQTT 프로토콜을 통해 EMQ 서버로 데이터 전송
  3. EMQ 서버로 publishing 된 데이터를 Kafka에서 subscribing
  4. Kafka에 들어온 데이터를 openTSDB에 저장
  5. openTSDB에 저장된 데이터를 Grafana로 시각화
  6. Spark를 이용하여 데이터 분석 (이상 감지)


EMQ를 통해 M2M 통신을 하는 상황이거나 데이터 누락을 어느 정도 허용하는 작업이라면 굳이 Kafka가 필요하진
않을 것이다. 하지만 일단 데이터를 수집하고 분석하는 작업을 전제로 하였기에 누락되는 데이터를 Kafka에서 1차로
완충하도록 구성하였다. 최종 분석단계에 해당하는 6번 Spark를 이용한 데이터 분석은 아직 좀 더 학습이 진행되어야
할 부분이다.


openTSDB 설치 및 실행


최근 시계열 데이터를 처리하는 NoSQL로는 influxDB가 대세인듯하다. 다만 openTSDB가 HBase를 기반으로
작동한다는 한 가지 이유만으로 나는 openTSDB를 선택하였다. 설치 및 연동 과정에서 후회를 많이 했지만 결국
불굴의 의지로 제대로 연동 시키고야 말았다…ㅠ.ㅠ influxDB가 대세라는 것은 구글링만 해봐도 충분하다. 이놈에
openTSDB는 검색을 해도 설치 외에는 쓸만한 정보가 없더라…-.-


간단하게 openTSDB의 개요를 말하자면 TSD라는 데몬을 통해 외부의 시계열 데이터를 HBase에 생성된 테이블에
저장하는 구조이다.




설치 시 유의사항

현재 최신 릴리즈는 2.3.0 이며 pre-release가 2.4.0 RC2까지 나와있다. 아래 설명할 kafka와의 연동을
위한 opentsdb-rpc-kafka라는 plugin을 사용하기 위해서는 반드시 2.4.0 RC2 버전을 설치해야 한다.
나는 이 사실을 모르고 2일간 삽질을 했다…ㅠ.ㅠ 


설치는 역시 간단하다. 압축 파일을 다운로드 받아 적당한 디렉토리(나는 당근 /opt)에서 압축을 풀고 압축을 푼
디렉토리 내에 있는 build.sh를 실행시키는 것만으로 설치는 끝이다. 


path


다만 앞서 말한 것처럼 openTSDB는 HBase 기반으로 작동을 하기 때문에 HBase가 이미 설치 되어있다는 것을 
전제로 한다. 나는 이미 이전에 클러스터를 구성하였기에 바로 설치에 들어갔다.


설치가 끝나면 build.sh 파일이 있는 그 경로에 build라는 경로가 추가되고 그 안에 openTSDB 실행 파일 등이
위치하게 된다. 설치가 끝나면 가장 먼저 테이블을 생성해주어야 한다. 테이블 생성 스크립트는 build/src 아래에
있다. 아래와 같이 실행해준다.


$ cd /opt/opentsdb/build
$ env COMPRESSION=NONE HBASE_HOME=/opt/hbase ./src/create_table.sh


이렇게 실행하면 ‘tsdb’, ‘tsdb-uid’, ‘tsdb-tree’, ‘tsdb-meta’ 이렇게 4개의 테이블이 HBase 상에 생성된다.


HBase

그리고 마지막으로 설정을 한다. 설정파일은 최초에 압축을 푼 디렉토리 밑에 있는 src 디렉토리에 있다.
나의 경우에는 /opt/opentsdb/src가 그 경로이며 파일 이름은 opentsdb.conf이다. 이 파일을 build로
복사한다. 즉, /opt/opentsdb/build로 복사를 하는 것이다. 수정해야 할 설정 내용은 간단하다.


# The TCP port TSD should use for communications
# *** REQUIRED ***
tsd.network.port = 4242
...
# The location of static files for the HTTP GUI interface.
# *** REQUIRED ***
tsd.http.staticroot = /opt/opentsdb/build/staticroot

# Where TSD should write it's cache files to
# *** REQUIRED ***
tsd.http.cachedir = /tmp/opentsdb
...
# Whether or not to automatically create UIDs for new metric types, default
# is False
tsd.core.auto_create_metrics = true


여기까지 모두 마쳤으면 /opt/opentsdb/build아래 있는 tsdb를 다음과 같이 실행해준다.


$ /opt/opentsdb/build/tsdb tsd


실행한 콘솔창에 로그가 올라가는 것이 보일 것이고 localhost:4242로 접속을 하면 아래 이미지와 같은 콘솔
화면을 볼 수 있다.


openTSDB


센서 데이터의 포맷 변경


기존에 아두이노 온습도계에서 EMQ 서버로 전송하던 데이터는 포맷을 좀 바꾸어야 한다. openTSDB에 저장하기
위한 데이터 포맷은 JSON 형식으로 모두 3가지 유형이 있다.


  • Metric
  • Aggregate
  • Histogram


자세한 설명은 생략하고…-.- 나는 가장 기본적인 Metric 형태로 데이터를 저장하기로 했다. Metric 형태로 저장할
경우 JSON 포맷은 다음과 같다(여러건을 한번에 보내려면 아래 형식을 배열 안에 넣으면 된다).


{
    "metric": "sys.cpu.nice",
    "timestamp": 1346846400,
    "value": 18,
    "tags": {
       "host": "web01",
       "dc": "lga"
    }
}


metric은 전체 측정 단위를, timestamp는 이름 그대로 long 타입의 타임스탬프를 value는 측정 값을, tags는
세부 분류를 위한 키-값 쌍을 추가하면 된다. 나의 예로 좀더 자세하게 살펴보자.


[
   {
      "type": "Metric",
      "metric": "mqtt.home.pcroom",
      "timestamp": 1346846400,
      "value": 22.5,
      "tags": {
         "type": "temperature",
         "loc": "pcroom"
      }
   },
   {
      "type": "Metric",
      "metric": "mqtt.home.pcroom",
      "timestamp": 1346846400,
      "value": 18.2,
      "tags": {
         "type": "humidity",
         "loc": "pcroom"
      }
   },
   {
      "type": "Metric",
      "metric": "mqtt.home.pcroom",
      "timestamp": 1346846400,
      "value": 22.3,
      "tags": {
         "type": "dust",
         "loc": "pcroom"
      }
  }
]


위 내용을 살펴보면 우선 같은 시점에 측정된 온도/습도/먼지의 측정값을 배열로 넘기고 있다. 이 데이터를
만드는 것은 아두이노에서 처리하며 관련 내용은 아래 링크의 소스코드 부분을 참조하면 된다.


아두이노를 이용한 온도/습도/먼지 측정기


최상위에 있는 type 값은 openTSDB 자체를 위한 것이 아니라 opentsdb-rpc-kafka plugin에서 필요한
것이다. 3가지 유형 중 하나를 지정하기 위한 속성이다.



metric의 값은 임으로 정하면 되는 것으로 나의 경우 그 의미는 mqtt를 이용하여 home에 있는 컴퓨터방
pcroom의 측정값이라는 뜻이다. 


timestamp와 value는 달리 설명할 부분이 없고 tags에 보면 type에 각각 temperaturehumiditydust
라는 값이 들어있는데 의미 그대로 온도와 습도와 먼지를 뜻한다.


이렇게 3개의 값을 하나의 배열에 포함시켜 보내는 것이다. tags에서 loc는 큰 의미는 없다. 필요에 따라 추가적인
값을 지정하면 된다.


Kafka와 openTSDB 연동


openTSDB 자체에 대한 자료 혹은 openTSDB와 Grafana 연동에 대한 자료는 그럭저럭 찾을 수 있었는데
Kafka와 openTSDB와의 연동에 대한 자료는 거의 찾을 수가 없었다. 구글링을 하면 가장 상위에 검색되는
내용이 2개인데 하나는 opentsdb-rpc-kafka라는 openTSDB의 plugin이고 다른 하나는 
kafka-connect-opentsdb라는 Kafka connector이다. 뭔가 사용하기에는 connector쪽이 쉬워보였으나
openTSDB의 HTTP API를 이용한다는 점이 맘에 들지 않았다. 그래서 opentsdb-rpc-kafka를 선택하게
되었고 그렇게 고난의 길은 시작되었다…ㅠ.ㅠ


일단 opentsdb-rpc-kafka는 Github에서 소스를 내려받은 후 빌드를 하여 jar 파일을 만들어야 한다.
소스는 Maven 프로젝트로 되어있어 이클립스에서 빌드하거나 또는 프로젝트 디렉토리 아래에서 man package
명령으로 빌드하면 된다.


플러그인 소스 경로는 다음과 같다.

https://github.com/OpenTSDB/opentsdb-rpc-kafka


이렇게 빌드된 jar 라이브러리를 /opt/opentsdb/build 아래에 plugins 디렉토리를 만들어 그 안에 넣는다.
plugin 라이브러리의 최종 위치는 다음과 같다. 그리고 이 plugin의 경로는 반드시 classpath에 추가시켜주어야 한다.


$ /opt/opentsdb/build/plugins/opentsdb-rpc-kafka-2.3.2-SNAPSHOT.jar


그리고 opentsdb.conf 설정 파일에 plugin과 관련된 내용을 추가해주어야 하는데 아래와 같은 내용을 설정 파일의
제일 하단에 추가해주면 된다. 물론 각 설정 값은 각자의 환경에 맞게 입력해야 한다. 고정이라고 주석을 달아놓은
설정 외에는 각자 환경에 맞춰 수정을 하자.


# --------- PLUGINS ---------------------------------
tsd.core.plugin_path = /opt/opentsdb/build/plugins
## 고정
tsd.core.storage_exception_handler.enable = true
## 고정
tsd.core.storage_exception_handler.plugin = net.opentsdb.tsd.KafkaStorageExceptionHandler
## 고정
tsd.rpc.plugins = net.opentsdb.tsd.KafkaRpcPlugin
KafkaRpcPlugin.kafka.zookeeper.connect = rpi1:2181,rpi2:2181,rpi3:2181
KafkaRpcPlugin.kafka.metadata.broker.list = rpi1:9092,rpi2:9092,rpi3:9092
KafkaRpcPlugin.groups = mqtt
KafkaRpcPlugin.mqtt.topics = mqtt-kafka
## 고정
KafkaRpcPlugin.mqtt.consumerType = raw
## 고정
KafkaRpcPlugin.mqtt.deserializer = net.opentsdb.data.deserializers.JSONDeserializer
#KafkaRpcPlugin.mqtt.rate = 5
KafkaRpcPlugin.mqtt.threads = 3
## 고정
KafkaRpcPlugin.seh.topic.default = seh


앞서 말한 바와 같이 이 플러그인을 사용하기 위해서는 openTSDB 2.4.0 RC2 버전을 설치해야 한다.
한동안 자바 개발을 등한시 했더니 아주 사소한 것을 파악하지 못해 삽질을 따따블로 했다. 분명 빌드는 잘 되는데
plugin 설치하고 openTSDB를 기동하면 쌩뚱맞게 속성에 접근하지 못한다느니, 메소드가 존재하지 않는다느니
하는 메시지가 나오면서 plugin이 제대로 작동하지 않는 것이다.


나중에 확인해보니 소스에 포함된 openTSDB 라이브러리는 2.4.0 버전이고 내가 설치한 openTSDB는 2.3.0
이었다. 전체적인 패키지 구조는 같지만 클래스 내에 변화가 있었기 때문에 속성이나 메소드 사용에 문제가
있었던 것이다. 확인 후 부랴부랴 2.4.0 RC2 버전으로 다시 설치를 하였다.


이제 다시 한 번 openTSDB를 실행해보자(이미 실행중이라면 실행 중인 콘솔에서 ctrl+c를 눌러 종료시키자).
드디어 Kafka에서 데이터를 쭈~욱 쭉 뽑아오는 모습을 볼 수 있을 것이다.

 

data


이 데이터는 이전에 클러스터 관련 포스팅 중 EMQ와 Kafka의 연동을 통해 가져온 데이터들이다. 관련 내용은 아래 
링크에서 확인할 수 있다.


Cluster : The Beginning - Apache Kafka와 EMQ 연동

주의 사항

EMQ에서 Kafka로 전송된 데이터의 포맷은 형태가 다르다. 실제로 openTSDB에 저장하기 위한 내용은
payload 부분에 들어있는데 이 payload의 값은 Base64 인코딩이 되어 저장된다. kafka와 EMQ 연동을
위해 사용한 connector에서 그렇게 처리를 하고 있다. 따라서 opentsdb-rpc-kafka plugin에서 데이터를
제대로 처리하기 위해서는 Kafka에서 받아온 값에서 payload의 값만을 추출한 뒤 이 값을 Base64 디코딩
하여 전달해야 한다. 


이 부분을 처리해야 하는 소스는 net.opentsdb.tsd.KafkaRpcPluginThread.java의 run 함수이다.


이렇게 모든 과정이 끝나면 앞서 localhost:4242로 접근했던 웹 콘솔을 통해 쿼리가 가능하다. 그런데 어쩐 일인지
나의 경우 오류가 발생하면서 데이터 조회가 되지 않았다. 콘솔창에 찍히는 URL이 조금 이상해 보이긴 한데 그것이
원인인지는 잘 모르겠다. 결국 openTSDB 자체 웹 화면에서 데이터를 조회하는 것은 성공하지 못했다.


openTSDB


Grafana 설치 및 설정


적어도 설치에 있어서는 지금껏 설치한 모든 시스템 중 Grafana가 가장 친절하였다.
운영체제별로 다운로드 버튼이 있고 이 버튼을 누르면 해당 운영체제에 설치하는 방법이 나온다.
나는 현재 Mac mini에 설치를 하고 있기 때문에 Mac에 대한 설치 방법을 제공해 주는 것이 얼마나
반가웠는지 모른다…ㅠ.ㅠ 설치는 매우 간단하다.


brew update 
brew install grafana


이렇게 설치를 하고나면 다음과 같은 내용이 콘솔 화면에 표시된다.


To have launchd start grafana now and restart at login:
  brew services start grafana
Or, if you don't want/need a background service you can just run:
  grafana-server —config=/usr/local/etc/grafana/grafana.ini —homepath /usr/local/share/grafana cfg:default.paths.logs=/usr/local/var/log/grafana cfg:default.paths.data=/usr/local/var/lib/grafana cfg:default.paths.plugins=/usr/local/var/lib/grafana/plugins


친절하게 Grafana의 실행 방법을 알려주는 것이다. 나는 그냥 간단하게 brew를 통해 실행을 하였다.
설정파일은 상당히 긴 편인데 따로 변경해주어야 할 것은 아무것도 없다. 웹 화면의 포트를 바꾸는 정도?


openTSDB와의 연동


설정을 바꾸지 않았다면 localhost:3000으로 Grafana 웹 콘솔에 접근할 수 있다. 처음 접속하면 계정을 물어보는데
기본 계정은 admin / admin이다. 접속을 해서 가장 먼저 수행해야 하는 작업은 Datasource를 연결하는 것이다.
Grafana는 다양한 DB와 연동 가능한데 type 항목에서 select box를 클릭하면 다음과 같이 목록이 표시된다.



나머지 설정들은 자료도 많고 하니 참고해서 입력하면 되는데 나같은 경우 HTTP settings에서 Access를 Direct로
설정하니 데이터 소스에 연결이 되지 않았다. Access를 proxy로 설정하고 HTTP Auth를 With Credentials로
설정하니 비로소 데이터소스 연결에 성공했다는 메시지가 표시되었다. 이 과정에서도 왜 연결이 안되는지 Grafana의
소스까지 까뒤집고 난리를 치면서 상당한 시간을 보냈다…ㅠ.ㅠ 전체적인 설정은 다음과 같다(OpenTSDB settings
의 Version은 현재 2.3까지만 선택 가능한데 2.4.0 RC2를 설치한 경우에도 2.3을 선택하면 된다).


Grafana


Datasource를 성공적으로 연결했다면 이제 대시보드를 만들면 된다. 좌측 메인 메뉴에서 Dashbord에 마우스를
올리면 나타나는 서브 메뉴에서 + New를 선택하여 새로운 대시보드를 만드는데 처음 나타나는 화면은 대시보드에
표현할 유형을 선택하는 화면이다. 나머지는 차차 알아보고 나는 우선 맨 앞의 Graph를 선택했다.


Grafana


Graph를 선택하면 아래 이미지와 같이 비어있는 Graph panel이 덩그러니 하나 나온다. 여기서 상단의 Panel Title을
클릭하면 몇가지 메뉴가 나오는데 Edit를 선택하자


Grafana


Edit를 선택하면 Graph panel 하단으로 Graph 조건을 입력하는 화면이 나온다. 역시 다른 항목들은 차차
알아보기로 하고 중요한 몇가지만 살펴보자.


먼저 General 항목으로 가서 Graph 이름을 지정해준다. 그러면 Graph 화면 상단에 있던 Panel Title이라는
문구가 지정해준 이름으로 바뀐다. 일단 TEST라고 입력해 보았다.


Grafana


다음 가장 중요한 Metric 설정이다. 역시 많은 항목이 있지만 꼭 필요한 2가지만 설정해보자.
먼저 Metric 값을 입력한다. 기억하시겠지만 openTSDB로 넘기는 JSON 문자열에 metric이라는 키가 있는데
바로 그 값을 넣어주면 된다. 나는 mqtt.home.pcroom이었다. Metric 값만 입력해도 벌써 챠트가 그려지는데
나의 경우 하나의 Metric에 온도/습도/먼지 3가지 데이터가 들어있었다. 


이 시점에 나타나는 그래프는 설정화면의 Metric 옆에 보이는 Aggregator가 sum으로 되어있기 때문에 
온도 + 습도 + 먼지의 값으로 그려진 그래프다. 한마디로 쓰레기다…-.- 물론 이런 값이 필요한 경우도 분명 있을 
것이다. 각각의 데이터로 그래프를 그리기 위해서는 tags에 지정한 구분값을 추가로 입력해주어야 한다. 


아래 그림과 같이 내가 tags에 지정한 type키와 그에 대한 값인 temperature를 입력해주었다. 입력 후 우측에 
있는 add tag 버튼을 눌러야 반영된다.


Grafana

Grafana


이제 온도에 대한 데이터만 Graph에 표시된다.
그렇다면 하나의 Graph에 여러 데이터를 표시하려면 어떻게 하면 될까?
Edit 영역 하단에 보면 Add Query라는 파란색 버튼이 있다. 이 버튼을 클릭하면 동일한 입력 폼이 하나 더 생기며
이렇게 새로 생긴 입력 폼에 tags를 구분해서 입력해주면 된다.


Grafana


최종 화면은 요렇게 보인다.


Grafana


정리


이거 은근히 내용이 길어져버렸다. 달리 생각하면 이 작업에 그만큼 많은 시행착오와 노력이 들어갔다는 말이 되겠지만
최근에는 시계열 데이터도 Elasticsearch와 Kibana를 이용하여 처리하는 것이 대세인 것 같아 내가 이러려고
openTSDB를 선택했던가 하는 자괴감이 든다는 철지난 개그가 절로 나오게 되었다…ㅠ.ㅠ 아무래도 본격적으로
데이터 처리를 하게 되면 바로 이 Elasticsearch + Kibana 조합으로 가지 않을까 싶다.


그래도 클러스터 구성에서부터, 작은 데이터를 가지고 클러스터의 일부분만 사용하는 작업이긴 하지만 실제 데이터를
다루어보았다는 점에서 상당한 만족감을 느낀다.


원래 최종 목표는 이렇게 수집한 데이터를 인공지능으로 분석하는 것이었다. 그리고 그 내용에는 트위터의 데이터를
수집하여 RNN 분석을 하는 것도 있었는데 앞으로의 계획이 너무 거창해져서 일단 그 부분은 보류를 해야 할 것 같다.


앞으로의 계획은 이렇다.

  1. 아두이노와 메카넘휠을 이용한 차량형 로봇 제작, 수집 대상은 모터 회전수, 초음파 센서를 이용한 장애물과의 거리 데이터, 충돌 센서를 이용하여 장애물과의 충돌이 발생했을 때의 false 정보, 바퀴의 둘레와 모터 회전수를 조합한 이동 거리 정보, 기타 부가 정보로 카메라를 이용한 영상 정보 등이다.
  2. 정보의 활용은 2가지이다. DQN을 이용한 강화학습을 통해 낮은 수준의 자율주행 구현이 그 하나이고 gazebo simulator를 이용한 Digital Twin을 구현하는 것이 다른 한가지다.


무식하면 용감하다고…그래도 그간 조금씩 습득해온 지식으로는 뭔가 어렵지 않게 될 것 같기도 한데 현실은 
어떨지…-.-


이제 긴 여정을 위해 잠시 학습하는 시간을 갖고 이후 블로그 포스팅은 주로 학습 내용을 정리하는 수준이 될 것 같다.
그럼 다음에…

블로그 이미지

마즈다

이미 마흔을 넘어섰지만 아직도 꿈을 좇고 있습니다. 그래서 그 꿈에 다가가기 위한 단편들을 하나 둘 씩 모아가고 있지요. 이 곳에 그 단편들이 모일 겁니다...^^




아두이노를 이용한 온도/습도/먼지 측정기


일단 EMQ + Kafka + Hadoop + HBase + Spark의 구성으로 클러스터 구성 작업은 마무리가 되었다.
그리고 현재 시점에 추후 정리하겠지만 센서 데이터 처리를 위한 openTSDB와 시각화를 위한 Grafana까지
추가로 설치를 완료한 상태이다.


구슬이 서말이라도 꿰어야 보배라고 아무리 클러스터를 구성하고 각종 시스템을 그럴듯하게 설치를 해놔봐야
전기 먹는 하마 밖에 더 되겠는가? 각 시스템의 용도에 맞는 처리를 해보고 그 흐름을 알아야 진정한 목표를 이루는
것일 터다. 게다가 개발자의 역할을 갖고 있는 나로서는 아마 백날가도 운영을 위한 클러스터에 하둡을 설치할
기회는 없을 것이다. 이렇게 보면 진정한 목표는 이러한 빅데이터 생태계에서 개발자로서의 역할을 활용할 지점을
찾는 것이라 보아야 한다.


어쨌든 기본적으로 필요한 것은 데이터이고 그래서 우선 간단하게 아두이노와 온습도 센서, 먼지 센서 등을 이용하여 
센서데이터를 수집해보기로 했다. 하지만 전혀 간단하지 않았다…ㅠ.ㅠ 오늘은 그 힘든 여정을 살펴보자


흔하디 흔한…하지만 유니크한…


사실 내가 하는 작업의 대다수는 거의 4~ 5년 전에 이미 누군가 시도했던 일들이다. 앞서 진행한 클러스터 구성도
그렇지만 아두이노를 이용하여 온도/습도/먼지를 측정하고 이를 데이터로 저장한 후 챠트를 이용하여 시각화 해서
보여주는 작업은 웬만큼 아두이노를 아는 사람들이라면 이미 오래 전에 해보았던 작업들이다.


그런데…남들 보다 센서 하나 더 붙인 것이 이렇게 특이한 상황을 만들어낼 줄은 꿈에도 몰랐다…ㅠ.ㅠ


일단 구성품을 보자.


아두이노 MEGA 2560

아두이노 메가 2560


ESP8266-01 Wi-Fi 모듈

ESP8266-01


DHT22 온습도 센서

DHT22


GP2Y1010 미세먼지 센서

GP2Y1010


DS3231 RTC 모듈

DS3231


네오픽셀 LED 모듈

네오픽셀 LED


LCD 모듈 (Nokia 5110)

Nokia 5110 LCD


몇가지 부품들은 설명이 좀 필요할 것 같은데…우선 RTC 모듈은 시간의 흐름에 따른 데이터를 저장해야 하기에
시간 정보가 필요하였고 부가적으로 화면에 시간과 날짜를 표시하기 위해 추가하였다. 다음으로 네오픽셀 LED
모듈의 경우 먼지 센서의 측정값에 따라 직관적으로 농도를 표시하기 위해 LED가 필요했는데 일반 LED로 4가지
색상을 표시하거나 RGB LED를 사용하기에는 아두이노 나노에 남는 입출력 핀이 부족하여 조금 비싸지만 네오픽셀
LED를 사용하게 되었다.


주의사항


혹시라도 그럴리는 없겠지만… 이 글을 보면서 바로 작업을 진행하는 분들이 계실지 몰라 주의해야 할 사항을 먼저 
언급해 두어야겠다.


처음 작업을 시작할 때에는 완성품의 부피를 줄이고자 아두이노 나노로 시작을 하였다. 핀수가 적긴 하지만 그래도
위에 언급한 부품들을 꼭 맞게 연결할 수 있을 정도는 되었다. 그러나 다른 곳에서 문제가 생겼다. 많은 센서들을
연결하다보니 사용하는 라이브러리 수가 늘어났고 또 Wi-Fi 모듈을 통해 MQTT 통신을 해야 하다보니 문자열
코딩이 많아졌다.


사실 아두이노로 그래도 적지 않은 작업을 했지만 역시 문돌이는 문돌이라 여전히 모르는 것 천지다보니 처음에는
완성된 온습도계가 자꾸 리셋되는 이유를 몰랐다. 이곳 저곳을 검색한 후 아두이노의 메모리 문제라는 것으로 판단하고 
그 해결책으로 pgmspace를 include하여 문자열에 대해 F함수 처리를 하였지만(F(“string”)) 그것으로는 역부족이었다. 


결국 아두이노를 나노에서 메가로 교체를 하였다(사실 아두이노의 스펙을 잘 몰라 중간에 우노도 한 번 사용을 했다).
이렇게 교체되는 과정에서 아두이노 사이즈가 커지는 바람에 나노나 우노에 맞춰 만든 외장 케이스가 못쓰게 되어버렸다.
열심히 드릴질까지 해가면서 야심차게 만들었는데…ㅠ.ㅠ


참고로 아두이노에는 3가지 영역의 메모리가 있으며 그 중에 개발자들이 활용할 수 있는 영역은 SRAM이다.
아두이노 종류에 따른 메모리 용량은 아래 표와 같다(나노는 우노와 동일하다).



소프트웨어적으로는 MQTT 통신을 위한 PubSubClient 라이브러리에서 주의를 좀 해야 한다.
이 라이브러리를 초기화 하기 위해서는 Wi-Fi 객체가 Client 타입이어야 하며 이를 지원하는 라이브러리가
몇가지 있는데 나는 그 중에 WiFiEsp를 사용하였으며 이러한 라이브러리를 사용하기 위해서는 ESP8266 모듈의
펌웨어도 AT25-SDK112 firmware라는 펌웨어를 사용해야 했다.


결론은…많은 센서를 사용하게 된다면 입출력 핀 문제 뿐만 아니라 메모리 문제 때문에라도 아두이노 메가를 사용하는
것이 안전하다는 점이다.


아두이노와 센서들의 연결


워낙에 많은 센서들을 부착하다보니 전체 배선도를 표시하는 것은 그리기도 또 읽기도 쉽지 않을 것 같아
각각의 센서 연결도를 따로따로 보여주도록 하겠다.


ESP8266-01


DHT22


GP2Y1010


DS3231 RTC


네오픽셀 LED


Nokia 5110 LCD


코드 작성


전체 코드가 조금 길기는 하지만 우선 주석과 함께 모두 올려본다.


//  Wi-Fi 모듈을 PubSubClient와 함께 사용하기 위해 선택한 ESP8266용 라이브러리
#include <WiFiEsp.h>
#include <WiFiEspClient.h>

// MQTT 통신을 위한 라이브러리
#include <PubSubClient.h>

// DHT22 온습도 센서 라이브러리
#include <DHT.h>

// DS3231 RTC 모듈 라이브러리
#include <DS3231.h>

// Nokia5110 LCD 사용을 위한 라이브러리
#include <Adafruit_GFX.h>
#include <Adafruit_PCD8544.h>

// 네오픽셀 LED 사용을 위한 라이브러리
#include <Adafruit_NeoPixel.h>

// 메모리 관리를 위한 F함수 또는 PROGMEM을 사용하기 위한 헤더파일. 아두이노 메가를 사용하는
// 시점에서 큰 의미는 없어졌음
#include <avr/pgmspace.h>

// MQTT 서버 주소
IPAddress server(123, 123, 123, 123);
// 공유기 SSID와 비밀번호
char ssid[] = "ABCDEFG";
char pass[] = "password";

// Wi-Fi 모듈 상태를 표시하기 위한 변수
 int status = WL_IDLE_STATUS;

// WI-Fi 모듈 객체 선언
WiFiEspClient espClient;
// Wi-Fi 객체를 파라미터로 MQTT 통신을 위한 clietn 생성
PubSubClient client(espClient);

// DHT22 사용을 위한 데이터 핀 설정과 모델 타입 매크로 선언
#define DHTPIN    5
#define DHTTYPE   DHT22

// DHT22 사용을 위한 객체 생성
DHT dht(DHTPIN, DHTTYPE);

// GP2Y1010 측정 핀과 측정을 위한 내부 LED 연결 핀 설정
int measurePin = A0;
int ledPower = 6;
 
// 먼지 측정을 위해 필요한 각종 변수 선언 (자세한 내용은 타 사이트 참조)
int samplingTime = 280;
int deltaTime = 40;
int sleepTime = 9680;
 
float voMeasured = 0;
float calcVoltage = 0;
float dustDensity = 0;

float totSum = 0;
float totCnt = 1.0;
float prevD = 0.0;

// DS3231 RTC 모듈 객체 생성
DS3231  rtc(SDA, SCL);
// 날짜와 시간 표시를 위한 변수
char *dateStr;
char *prevDateStr = "";

// Nokia 5110 LCD 사용을 위한 객체 생성
Adafruit_PCD8544 display = Adafruit_PCD8544(7, 8, 9, 10, 11);

// 네오픽셀 LED  사용을 위한 핀 번호 및 LED의 index 매크로 선언
#define PIN 4   
#define LEDNUM 1  
// 네오픽셀 LED 사용을 위한 객체 생성
Adafruit_NeoPixel strip = Adafruit_NeoPixel(LEDNUM, PIN, NEO_GRB + NEO_KHZ800);

void setup() {
  // 시리얼 초기화
  Serial.begin(9600);
  // Wi-Fi 모듈 사용을 위한 Serial1 초기화 
  Serial1.begin(9600);
  WiFi.init(&Serial1);

  // Wi-Fi 모듈 연결 여부 확인
  if (WiFi.status() == WL_NO_SHIELD) {
    while (true);
  }
  
  // AP에 접속
  while ( status != WL_CONNECTED) {
    status = WiFi.begin(ssid, pass);
  }

  // MQTT를 위한 서버 정보 설정 및 콜백함수 설정. 나는 MQTT publishing만 할 것이기에
  // 콜백 함수는 빈 함수로 남겨둠
  client.setServer(server, 1883);
  client.setCallback(callback);
  
  // 기타 설정들은 다름 함수로 뺌
  otherSetting();
  delay(2000);
}

void otherSetting() {
  // DS3231 RTC 모듈 초기화
  rtc.begin();

  // 시작 날짜와 시간을 설정하는 코드로 최초 사용시에만 필요함
//  rtc.setDate(25, 1, 2018);
//  rtc.setTime(23, 21, 45);
//  rtc.setDOW(4);
  
  // 네오픽셀 LED 초기화
  strip.begin();
  pinMode(ledPower,OUTPUT);

  // Nokia 5110 LCD 초기화
  display.begin();
  display.setContrast(65);
  display.setTextSize(1);
  display.setTextColor(BLACK);

  // DHT22 초기화
   dht.begin();
}

void callback(char* topic, byte* payload, unsigned int length) {
  //MQTT 통신에서 subscrib시 필요한 내용이나 나의 경우 publish만 하므로 그냥 비워둠
}

void loop() {
  // 메인 루프에서는 MQTT 통신이 끊어져있으면 연결을 하고 연결되어있으면 데이터를 모아서
  // MQTT 서버로 전송하는 작업을 진행함
  if (!client.connected()) {
    // 연결이 끊어져있으면 다시 연결 시도	
    reconnect();
  } else {
    // MQTT 서버에 연결되어있으면 데이터를 수집하여 MQTT 서버로 보냄
    getValueAndSendData();
    // 위 작업을 5초마다 한 번씩 수행
    delay(5000);
  }
}

void getValueAndSendData() {
  // 온도, 습도측정 값을 담을 변수를 선언하고 DHT22 센서로부터 측정값을 받아옴
  float temperature = dht.readTemperature(); 
  float humidity = dht.readHumidity();
  // 먼지 센서의 경우 조금 복잡한 계산이 필요하여 따로 함수 처리를 함
  float d = getdust();

    // 간혹 먼지 센서에서 노이즈값이 나오므로 이에 대한 처리를 해줌(값자기 100 가까운 값이
    // 나오기도 하나 음수에 대해서만 처리했음
    if (d < 0.0) {
      d = prevD;
    }

    prevD = d;

    // DS3231로부터 요일값을 받아옴
    dateStr = rtc.getDOWStr(FORMAT_SHORT);

    // 현재 먼지 센서의 측정 값을 순간값이 아닌 그 시점까지의 그날 평균으로 보여주고 있으며(1일 평균)
    // 날짜가 바뀌면 다시 처음부터 계산하도록 처리함
    if (prevDateStr == dateStr) {
      totSum += d;
      totCnt += 1.0;
    } else {
      totSum = d;
      totCnt = 1.0;
    }

    prevDateStr = dateStr;

    float avgD = totSum / totCnt;

    // 먼지 측정 값에 따라 네오픽셀 LED의 색상을 변경함
    if (avgD <= 30) {
      strip.setPixelColor(0, 0, 0, 255); 
    } else if (avgD <= 80) {
      strip.setPixelColor(0, 0, 255, 0); 
    } else if (avgD <= 150) {
      strip.setPixelColor(0, 255, 255, 0); 
    } else if (avgD >= 151) {
      strip.setPixelColor(0, 255, 0, 0); 
    }
    strip.show();

    // 센서 데이터들을 JSON 포맷으로 만들어 MQTT 서버로 전송함
    // 이 JSON 포맷은 시계열 데이터용 NoSQL인 openTSDB에 넣기 위한 포맷임
    // 주의할 것은 PubSubClient 라이브러리는 전송 가능한 패킷 사이즈가 128byte로 
    // 기본 설정되어 더 큰 크기의 패킷 전송을 위해서는 MQTT_MAX_PACKET_SIZE 값을
    // 변경해주어야 한다. 나는 넉넉하게 1024로 설정했다.
    char attributes[393];
    String dtStr = String(rtc.getUnixTime(rtc.getTime()));
    String payload = F("[");
          payload += F("{");
          payload += F("\"type\":\"Metric\"");
          payload += F(",");
          payload += F("\"metric\":\"mqtt.home.pcroom\"");
          payload += F(",");
          payload += F("\"timestamp\":"); payload += dtStr; 
          payload += F(",");
          payload += F("\"value\":"); payload += String(temperature); 
          payload += F(",");
          payload += F("\"tags\": {"); 
          payload += F("\"type\":\"temperature\",");
          payload += F("\"loc\":\"pcroom\"");
          payload += F("}},");
          
//    payload.toCharArray( attributes, 131 );
//    client.publish("/mqtt", attributes);
    //Serial.println(attributes);
    
          payload += F("{");
          payload += F("\"type\":\"Metric\"");
          payload += F(",");
          payload += F("\"metric\":\"mqtt.home.pcroom\"");
          payload += F(",");
          payload += F("\"timestamp\":"); payload += dtStr; 
          payload += F(",");
          payload += F("\"value\":"); payload += String(humidity); 
          payload += F(",");
          payload += F("\"tags\": {"); 
          payload += F("\"type\":\"humidity\",");
          payload += F("\"loc\":\"pcroom\"");
          payload += F("}},");

//    payload.toCharArray( attributes, 131 );
//    client.publish("/mqtt", attributes);
    //Serial.println(attributes);
    
          payload += F("{");
          payload += F("\"type\":\"Metric\"");
          payload += F(",");
          payload += F("\"metric\":\"mqtt.home.pcroom\"");
          payload += F(",");
          payload += F("\"timestamp\":"); payload += dtStr; 
          payload += F(",");
          payload += F("\"value\":"); payload += String(avgD); 
          payload += F(",");
          payload += F("\"tags\": {"); 
          payload += F("\"type\":\"dust\",");
          payload += F("\"loc\":\"pcroom\"");
          payload += F("}}");
          payload += F("]");
    
    // String 타입으로 만든 JSON 문자열을 byte 배열로 변경한 후 서버로 publishing
    // "/mqtt"는 토픽 이름
    payload.toCharArray( attributes, 393 );
    client.publish("/mqtt", attributes);

    //Serial.println(attributes);
    
    // 날짜, 시간, 요일, 온도, 습도, 먼지 등의 값을 LCD에 출력
    display.clearDisplay();   
    display.setCursor(0,0);
    display.print(rtc.getTimeStr(FORMAT_SHORT));
    display.print(F(" ["));
    display.print(dateStr);
    display.print(F("]\n"));
    display.println(rtc.getDateStr(FORMAT_LONG, FORMAT_BIGENDIAN, '-'));
    display.print(F("\nT: "));
    display.println(temperature, 1);
    display.print(F("H: "));
    display.println(humidity, 1);
    display.print(F("D: "));
    display.print(avgD, 1);
    display.display();
}

// MQTT 서버에 연결이 안되었을 시 재연결하는 함수
void reconnect() {
  while (!client.connected()) {
    // 아래 connect 함수를 통해 전달하는 3개의 문자열은 그냥 임의로 정해도 됨
    if (client.connect("ARDUINO", "mazdah", "abcdefg")) {
      Serial.println(F("connected"));
    } else {
      Serial.print(F("failed"));
      delay(5000);
    }
  }
}

// 먼지 측정을 위한 함수
// 고수들은 한번 측정된 값을 사용하지 않고 짧은 시간에 여러번 측정을 하여 그 평균을 사용함
float getdust(){
  digitalWrite(ledPower,LOW);
  delayMicroseconds(samplingTime);
  voMeasured = analogRead(measurePin);
  
  delayMicroseconds(deltaTime);
  digitalWrite(ledPower,HIGH);
  delayMicroseconds(sleepTime);
  
  calcVoltage = voMeasured * (5.0 / 1024);
  dustDensity = (0.17 * calcVoltage - 0.1) * 1000.0;
  return(dustDensity);
}


코드가 좀 길기는 하지만 어려운 코드는 없다. 다만 나의 경우 앞서 말한 것 처럼 메모리 용량이 적은 아두이노를 
사용했다가 원인불명(?)의 리셋으로 한참을 시간 낭비 했고 MQTT 라이브러리의 특성을 잘 몰라 또 한참 시간을 
낭비하게 되었다. MQTT 관련한 시간 낭비 중 하나는 코드에 주석처리한 전송 패킷 사이즈 문제였고 다른 한가지는
지속적으로 데이터를 송신하지 않으면 네트워크가 끊어지는 것이 당연한데 데이터를 보내지도 않으면서 자꾸
MQTT 네트워크가 끊어지는 것이 코드나 센서 이상이라고 생각해서 꽤나 고민했다. 결국은 내가 데이터를 보내기
시작하면서 이해를 하게 되었다.


결과물


처음 아두이노 나노와 우노를 이용해서 만들었을 때 그에 준하여 케이스를 만들었다. 예전에 샀던 블루투스
스피커의 아크릴 포장 케이스가 마침 딱 알맞은 크기였기에 거기에 공기를 통하게 하기 위한 구멍을 열심히
뚫어서 나름 예쁜 모양을 만들었는데…


메모리 문제로 아두이노 메가로 바꾼 후 더이상 아두이노와 센서들이 깔끔하게 케이스 안으로 들어가질 않는다.
결국 내장을 뽑아낸 채 이렇게 볼쌍 사나운 모습으로 열심히 데이터를 모으고 있다…ㅠ.ㅠ


사실 모듈 배치를 고려하지 않은 부분도 있었다. 아무래도 연결된 센서(특히 ESP8266)나 아두이노 자체에서도
발열이 있기에 아무리 구멍을 뚫었다 할지라도 좁은 공간에 함께 넣게 되면 온도 측정에 심각한 영향을 미치게
된다. 먼지 센서도 가급적이면 통풍이 잘되도록 신경을 써주어야 한다.


정리


시작할 때 생각했던 것보다 많이 어려웠고 또 시간도 많이 잡아먹었다.
하지만 그만큼 새로운 사실들도 많이 알게되어 결코 낭비만은 아니었다고 생각한다.


사실 애초에 온/습도 또는 먼지를 측정하여 처리하는 내용은 워낙 흔해서 수집할만한 다른 데이터가 없을지
많이 고민을 해보았다. 그런데 은근히 만만치가 않았다. 생각 외로 아두이노 센서 중에 데이터를 수집할만한
센서가 그리 많지 않았다. 어떤 데이터를 모을까 생각하며 시간을 축내기보다는 흔한 작업이지만 일단 데이터를
모아보자는 생각에 이와 같은 작업을 진행하게 되었다. 사실 중요한 것은 앞서 구성한 클러스터를 통해
정상적으로 데이터를 처리할 수 있는지 확인하는 것이 우선이었으니까…


다음 포스팅에서 정리하겠지만 우선 이렇게 만든 온습도계를 통해 데이터를 모아 MQTT 서버를 거치고 Apache
Kafka를 통해 openTSDB에 저장을 하고 Grafana를 통해 시각화 하는 부분까지는 정상적으로 된 것 같다
(하지만 세부적으로는 Kafka쪽에 더 살펴봐야 할 것이 남아있다). 맛보기로 현재 들어오고 있는 데이터에 대한
Grafana 챠트를 올린다.

grafana


일단 기본적인 작업을 성공하였기에 다음에는 조금 더 큰 작업을 해보려고 한다. 내 수준에서 가능할지는 모르겠지만
메카넘 휠을 사용한 차량형 로봇으로 모터의 회전 수와 회전 방향, 거리 센서를 통한 데이터, 충돌 센서 및 충돌 스위치 
센서를 통해 충돌시 실패 감지 등의 데이터를 수집하고 이를 인공지능으로 분석하여 낮은 수준의 자율주행차를 만들어
볼 계획이다. 내 지식 수준에서 바로 될 수는 없는 장기 목표라서 올해 말쯤에나 결과를 볼 수 있겠지만…^^;;;


이 작업이 성공하면 다음은 자율주행 드론에 도전!!!


암튼 하고 싶은 것이 널려서 참 행복한 한 해가 될 것 같다.
그래도 우선 다음 포스팅에서는 센서데이터 -> MQTT -> Kafka -> openTSDB -> Grafana로 이어지는 데이터
처리에 대해 알아보도록 하겠다.

블로그 이미지

마즈다

이미 마흔을 넘어섰지만 아직도 꿈을 좇고 있습니다. 그래서 그 꿈에 다가가기 위한 단편들을 하나 둘 씩 모아가고 있지요. 이 곳에 그 단편들이 모일 겁니다...^^









Cluster : The Beginning - Spark 2.2.1 설치


이제 클러스터 구축의 마지막 포스팅이다!
엄밀하게 말해 제대로 된 테스트를 진행하지 않았으니 껍데기만 설치해놓으 것이나 다름 없지만
이 분야의 초보자나 다름 없는 입장에서 일단 이렇게나마 설치를 해놓고 실제 데이터를 분석하면서 부족한
부분을 채워 나가는 것이 더 바람직하지 않나 하는 판단이다.


사실 Kafka 이후 어떤 내용을 더 추가할 것인가 고민을 좀 했었다. 워낙 방대한 하둡 생태계에서 무엇을 선택해야 할지
길을 잃었다고 하는 것이 옳은 표현이겠다. 하지만 선택은 빨랐다. 하둡 생태계의 많은 시스템들이 주목을 받고 있지만
그 중에서도 Spark의 지위는 상당한 것으로 보였다. 게다가 아무래도 머신러닝과 딥러닝 등 인공 지능이 대세인 이 
시점에서 머신 러닝 라이브러리인 Spark ML을 제공해주는 Spark는 분명 배워볼만한 시스템임이 틀림 없었다.


다만…In-Memory 시스템인 Spark를 겨우 8Gb 메모리를 장착한 가정용 PC 5대로 원활히 돌려볼 수 있을지는
의문이다.


사전 준비


Spark의 클러스터 관리를 위한 시스템 설치(Mesos또는 Hadoop YARN) 외에 특별한 사전 준비가 필요한 것은 
아니지만 JAVA 버전은 1.8.X 이상이어야 한다. 공식 문서에 보니 2.1.X 버전에서는 JAVA 1.7까지 지원했으나
2.2.0으로 버전이 올라가면서 1.8 이상을 필요로 하게 되었다.


설치


타 시스템과 다를 바 없이 먼저 바이너리 압축 파일을 다운로드 받는다 내가 받은 파일은 아래와 같다.


spark-2.2.1-bin-hadoop2.7.tar.gz


앞선 포스팅에서도 언급했듯이 나는 현재 모든 하둡 생태계의 시스템들을 /opt 아래에 설치하고 있기에 Spark 역시
/opt 압축을 풀었다. 생성된 디렉토리에서 버전정보는 뺐기 때문에 최종 경로는 /opt/spark가 되었다.


설정


기본적인 설정 파일은 spark-defaults.conf와 slaves 파일이며 cluster 모드로 설정하기 위해서는 spark-env.sh에
필요한 환경 변수를 설정해주면 된다. 나는 YARN을 이용한 cluster모드로 설치하기로 했기에 다음과 같이 설정하였다.


spark-env.sh

export JAVA_HOME=/Library/Java/Home
export HADOOP_HOME=/opt/hadoop
export SPARK_HOME=/opt/spark

##하둡과 YARN의 설정파일 경로를 추가해준다.
export HADOOP_CONF_DIR=$HADOOP_HOME/etc/hadoop
export YARN_CONF_DIR=$HADOOP_HOME/etc/hadoop


spark-defaults.conf

## 클러스터 매니저가 접속할 마스터 서버 URI
spark.master                     spark://SECONDARY-NAMENODE.local:7077
##Spark 이벤트를 기록할지 여부. 응용 프로그램이 완료된 후 웹 UI를 재구성하는 데 유용
spark.eventLog.enabled           true

# spark.eventLog.dir               hdfs://NAMENODE.local:8021/sparkdir
##이벤트 로그를 기록할 경로
spark.eventLog.dir		 file:///opt/spark/sparkeventlog

##네트워크를 통해 전송되거나 직렬화 된 형식으로 캐시되어야하는 객체를 직렬화하는 데 사용할 클래스
spark.serializer                 org.apache.spark.serializer.KryoSerializer

##드라이버 프로세스, 즉 SparkContext가 초기화되는 곳에 사용할 메모리 크기
##클라이언트 응용프로그램에서 직접 변경하면 안됨
spark.driver.memory              2g

##YARN 관련 설정들

##YARN 응용 프로그램 마스터에 사용할 메모리 크기
##클러스터 모드에서는 spark.driver.memory를 사용하라는데...필요 없는 설정일지도...-.-
spark.yarn.am.memory             1g

##정적 할당에 대한 집행자의 수. spark.dynamicAllocation.enabled를 사용하면 실행 프로그램의 
##초기 세트가 최소한이 정도 커짐
spark.executor.instances           2
# spark.executor.extraJavaOptions  -XX:+PrintGCDetails -Dkey=value -Dnumbers="one two three"

##실행 프로그램에 추가할 JVM 옵션 설정. 스파크 속성 또는 최대 힙 크기 (-Xmx) 설정을 지정하는 것은 안됨
spark.executor.extraJavaOptions        -Dlog4j.configuration=file:/opt/spark/conf/log4j.properties

##드라이버에 추가할 JVM 옵션 설정. 스파크 속성 또는 최대 힙 크기 (-Xmx) 설정을 지정하는 것은 안됨
spark.driver.extraJavaOptions        -Dlog4j.configuration=file:/opt/spark/conf/log4j.properties


slaves

##슬레이브 노드로 사용할 서버들의 호스트명
DATANODE1.local
DATANODE2.local
DATANODE3.local


마지막으로 이전 포스팅에서도 언급한 바와 같이 Mac에 설치하는 경우 spark-env.sh파일에서 nohup으로 시작하는
명령행에서는 nohup을 삭제한 후 실행시켜야 한다.


클러스터 노드에 배포


설정이 모두 끝났으면 전체 배포본을 압축하여 클러스터를 구성할 각 노드들에 복사를 해준다.
나는 DATANODE1.local, DATANODE2.local,DATANODE3.local 3대의 노드를 슬레이브 노드로 사용하기에
아래와 같이 복사를 해주었다.


$ scp spark.tar.gz hadoop@DATANODE1.local:/opt
$ scp spark.tar.gz hadoop@DATANODE2.local:/opt
$ scp spark.tar.gz hadoop@DATANODE3.local:/opt

그리고 각각의 노드에서 압축을 풀어주면 된다.


실행


마스터 노드에서 아래와 같이 실행한다.

$ /opt/spark/sbin/start-all.sh 


실행 확인


마스터 노드에서 포트번호 8080으로 접속하게 되면 Spark 관리 콘솔이 브라우저 화면에 보여진다.



정리


이렇게 Spark를 마지막으로 클러스터를 구성하는 1차 목표가 완료되었다.
물론 이후에도 몇가지 목적에 의해 openTSDB를 추가로 설치하였으며 앞으로 또 어떤 시스템을 더 설치하게 될지
모르겠다. 하지만 애초에 설치를 목표로 했던 시스템들은 모두 설치가 되었다.


하지만 구슬이 서말이라도 꿰어야 보배라고…이렇게 설치한 클러스터를 이용하여 실제 데이터의 흐름을 이해하고
또 이렇게 모아진 데이터를 분석하는 더 큰일이 남아있다. 


전에도 말했지만 이미 Spark까지의 모든 시스템은 1월 초에 모두 설치가 완료되었고 이 블로그를 정리하는 시점에는 


아두이노를 이용한 온도/습도/먼지 센서를 이용하여 데이터를 수집하는 작업을 시작했는데 역시나 생각만큼 쉽지 않다.
잠시 쉬는 기간이라 생각하고 차근차근 연구를 해봐야 할 것 같다.


본격적인 싸움은 이제부터다!!!

블로그 이미지

마즈다

이미 마흔을 넘어섰지만 아직도 꿈을 좇고 있습니다. 그래서 그 꿈에 다가가기 위한 단편들을 하나 둘 씩 모아가고 있지요. 이 곳에 그 단편들이 모일 겁니다...^^








Cluster : The Beginning - HBase 1.2.6 설치


직전에 포스팅한 Hadoop 설치를 기점으로 복잡한 과정은 거의 끝이 난 듯싶다.
사실 하둡에서 바로 MapReduce를 사용하거나 Spark와 같은 분석 툴을 설치하는 것으로 마무리해도 좋겠으나
그래도 구색을 갖춰보려고 굳이 NoSQL을 하나 설치해보기로 했다.


HBase는 예전에 설치해보기도 하였고 Cassandra가 더 좋은 성능을 보인다는 말도 있고 또 진입 장벽도 비교적
낮다고 하여 Cassandra를 설치해볼까 고민을 하다가 그래도 Hadoop을 설치해놓은 터라 아무래도 HBase를
설치하는 것이 낫다고 판단되어 결국은 HBase로 결정을 하였다.


하지만 개인적으로 학습하는 수준이라는 것이 뻔한 것이다보니 과한 짓을 하는 것은 아닌가 하는 생각도 든다.
(그렇게 따지자면 지금껏 해온 모든 작업이 다 우도살계인 격이지만…-.-)


설치


앞서 포스팅한 다른 시스템들과 마찬가지로 바이너리 패키지를 다운로드 받아 적절한 위치에 압축을 풀면 설치
끝이다. 나는 1.2.6 버전을 다운로드 받아 /opt 아래 압축을 풀었다. 다른 시스템들과 마찬가지로 버전 번호는
과감히 삭제하고 HBase 홈 디렉토리를 /opt/hbase로 만들었다.




사전 준비


HBase를 설치하기 꺼려졌던 가장 큰 이유는 HBase도 클러스터 관리를 위해 zookeeper를 사용한다는 점이었다.
그리고 예전에 처음 설치할 때부터 의문이었지만 zookeeper가 필요한 시스템들 각각을 위해 별도의 zookeeper
앙상블을 만들어야 하는 것인지 아니면 하나의 zookeeper 앙상블로 다수의 클러스터 시스템을 관리할 수 있는 것인지
하는 부분도 잘 알지 못했다. 잘 알지 못하다보니 가급적이면 zookeeper와 엮이지 않는 시스템을 찾고 싶기도 했다.


그러던 차에 구글 그룹과 호튼웤스 커뮤니티에서 도움이 될만한 글타래들을 찾아 어느정도 답을 얻을 수 있었다.
사실 zookeeper에 대해 조금만 공부를 했더래도 알 수 있는 내용이었지만…


결론부터 말하자면 하나의 zookeeper 앙상블로 다수의 클러스터(나의 경우 Kafka, HA Hadoop, HBase)를
관리할 수 있으며 웬만큼 큰 규모가 아니면 3노드 정도의 앙상블로 충분하다는 것이다(처음 Kafka와 HBase를
설치했을 때는 zookeeper 앙상블을 5대의 노드에 구성했더랬다…-.-).


관련 글타래를 아래에 링크한다.


https://groups.google.com/forum/#!topic/storm-user/cYSZE8RDHJ0

https://community.hortonworks.com/questions/35287/how-to-decide-how-many-zookeepers-should-i-have.html 



설정



Hadoop에 비한다면 크게 설정할 것은 없다.

backup-master

# HA 클러스터를 구성할 경우 백업 마스터로 사용할 호스트명을 기록한다.
SECONDARY-NAMENODE.local


base-site.xml

<!-- 
HBase가 지속적으로 실행되면서 regionserver들과 정보를 공유하는 경로
Hadoop의 core-site.xml에 있는 fs.defaultFS 설정과 동일하게 한다
--> 
<property>
	<name>hbase.rootdir</name>
	<value>hdfs://NAMENODE.local:8020/hbase</value>
</property>
<!--
마스터가 될 노드의 URI
-->
<property>
	 <name>hbase.master</name>
	<value>NAMENODE.local:6000</value>
</property>
<!--
zookeeper 앙상블을 구성하는 노드들의 호스트명
나의 경우 Kafka 설치시 설치했던 라즈베리파이의 호스트명을 적었다.
-->
 <property>
	 <name>hbase.zookeeper.quorum</name>
	<value>rpi1,rpi2,rpi3</value>
</property>
<!--
zookeeper 클라리언트가 사용할 포트. 기본 포트인 2181을 사용한다.
-->
<property>
	<name>hbase.zookeeper.property.clientPort</name>
	<value>2181</value> 
</property> 
<!--
데이터 복제 계수를 3으로 지정하였다.
-->
<property>
	<name>dfs.replication</name>
	<value>3</value>
</property>
<!--
HBase를 완전 분산모드로 사용하기로 하였다.
-->
<property>
	<name>hbase.cluster.distributed</name>
	<value>true</value>
</property>
<!--
하나의 Datanode에서 동시에 서비스 가능한 block 개수 제한.
-->
<property>
	<name>dfs.datanode.max.xcievers</name>
	<value>4096</value>
</property>


regionservers

# regionserver로 사용될 노드들의 호스트명을 적는다.
DATANODE1.local
DATANODE2.local
DATANODE3.local


이렇게 간단하게 설정을 마쳤다.
그런데 각 설정에 대한 설명을 찾다보니 내가 한 설정 항목의 일부는 최근 버전에서는 name이 바뀐 것들이
조금 있었다. 아무래도 공식 사이트를 참조하지 않고 여기 저기 블로그를 기웃거리며 정리하다보니 이런 문제가
생겨버렸다.


게다가 HBase 홒메이지의 설정 관련 항목에 하둡 버전과 그 버전에서 지원하는 HBase 버전을 표로 정리한
내용이 있는데 이상하게도 하둡 3.0.0은 목록에 있는데 내가 설치한 2.9.0은 목록에 없다… 과연 내가 설치한
하둡 2.9.0 + HBase 1.2.6은 올바른 조합인지도 잘 모르겠다…ㅠ.ㅠ 아직도 갈길이 멀다…


https://hbase.apache.org/book.html



일단 이렇게 설정을 하고 배포본을 압축하여 sap 명령으로 backup master와 regionserver들엑 복사를
해준다.


실행


실행은 간단하다. 마스터 노드에서 아래와 같이 실행하면 regionserver들도 함께 실행된다.

hadoop@NAMENODE.local $ /opt/hbase/bin/start-hbase.sh


마지막으로 backup master에서 아래와 같이 실행한다.

hadoop@SECONDARY-NAMENODE.local $ /opt/hbase/bin/master-backup.sh



확인


모든 노드에서 HBase가 정상적으로 실행되면 NAMENODE.local:16010과 SECONDARY-NAMENODE.local:16011에서 아래와 같은 화면을 볼 수 있다.




정리


설정 부분에서도 말한 바와 같이 사실 제대로 설치를 하고 설정을 하기 위해서는 공식 홈페이지만한 참고 자료가
없으나 영어의 압박으로 아무래도 한글로 정리해놓은 블로그를 주로 찾게 된다. 그러다보니 설치하는 버전도
제각각이고 설정의 내용도 제각각이어서 간혹 내가 제대로 하고 있는지 의심이 든다. 추후 공식 자료를 통해 한 번 더 
정리하는 작업을 거쳐야 제대로 된 정보의 공유라고 할 수 있을 것 같다.


이제 계획했던 작업은 Apache Spark 하나가 남았다. Spark에 대한 정리가 끝나면 본격적으로 데이터를 수집하고
저장하고 분석하는 단계를 진행해야 할텐데 이 과정에는 프로그래밍도 필요하기에 더 어려운 작업이 될 것 같다.
하지만 한편으로는 매번 시스템 설치만 해놓고 마치 장식장 안의 인형을 보듯이 지켜보기만 했는데 이제 뭔가 실질적인
작업을 해본다는 측면에서 기대가 되기도 한다.


얼른 달려보자~






블로그 이미지

마즈다

이미 마흔을 넘어섰지만 아직도 꿈을 좇고 있습니다. 그래서 그 꿈에 다가가기 위한 단편들을 하나 둘 씩 모아가고 있지요. 이 곳에 그 단편들이 모일 겁니다...^^