*글 최초 발행일자: 2018.03.29*
마이크로소프트 애저(Azure) 환경에서 빅 데이터 플랫폼을 운영하는 방법은 여러 가지입니다. 직접 호튼웍스 HDP 같은 빅 데이터 플랫폼을 올려 클러스터를 꾸리는 방법과 함께 내부에 클러스터 관리 인력이 마땅치 않다면 마이크로소프트가 제공하는 매니지스 서비스를 이용할 수도 있습니다.
Azure HDInsight란?
하둡, 스팍, 하이브, 맵리듀스, H베이스, 스톰, 카프카,
마이크로소프트 R 등 오픈 소스 기반 분석 클러스터 환경을
마이크로소프트가 책임지고 관리하는
매니지드 서비스입니다.
99.9% 수준의 SLA를 보장하기 때문에 클러스터 문제로
분석 작업에 차질이 생길까 걱정하지 않아도 됩니다.
이 서비스 이름은 그 유명은 'Azure HDInsight'입니다. 호튼웍스 HDP로 직접 꾸린 환경과 HD인사이트를 동시에 운영할 수도 있는데, 그 이유는? HD인사이트의 데이터 플랫폼은 호튼웍스 HDP입니다. 따라서 온프레미스, 클라우드, 하이브리드 어떤 구성을 하건 설정이나 운영 관련해 고민할 게 별로 없습니다.
다시 주제로 돌아와 오늘 소개할 내용은 HD인사이트 관련 개발자 가이드 문서입니다. 분량이 735페이지나 되는 PDF 문서인데요, 그냥 서점에서 입문서 하나 사는 마음으로 다운받아 읽어 보기 좋네요. 영어란 점 빼면 무료 가이드 문서 내용이 충실하다는 점, 마이크로소프트에 수고했다는 말 한마디 건네고 싶네요. 이 문서는 깃허브 사이트에서 다운로드 받을 수 있습니다.
hdinsight/hdinsight-dev-guide
hdinsight-dev-guide - HDInsight Developer Guide
github.com
이 문서는 HD인사이트 개념, 특징을 앞에서 간단히 설명한 다음 본론으로 들어갑니다. 이 책의 본론은 크게 두 부분인데요 HD인사이트 환경에서의 통합 자원 관리 관련해 아파치 암바리(Ambari)에 대한 내용이 한 파트이고 다른 한 부분은 실제 개발 시나리오와 예제 코드입니다. 맛보기 차원에서 HD인사이트 환경에서 하둡과 하이브를 사용해 트위트 데이터를 분석하는 예를 짧게 소개하겠습니다.
HD인사이트 환경에서 하둡과 하이브를 이용한 트위터 분석
본 테스트는 HD인사이트 3.6에서 이루어진 것입니다. 먼저 할 일은 데이터를 가져오는 것이죠. REST API를 통해 각각의 트윗 데이터를 JSON 문서 형식으로 가져옵니다. 일단 웹 브라우저로 https://apps.twitter.com으로 접속해 로그인합니다. 'Create New App'을 클릭한 이름, 설명, 사이트 필드 값을 입력합니다. 다음은 예제입니다.
이후 동의를 묻는 화면에서 'I Agree' 버튼을 투르고 'Create your Twitter application'를 클릭합니다. 다음에 'Permission' 탭을 여는데, 기본 설정은 읽기 전용입니다. 'Keys and Access Tokens' 탭을 열어 접근을 위한 보안 토큰을 생성합니다. 페이지 오늘쪽 상단에 위치한 'Test OAuth'를 누른 후 키, 사용자 정보 등을 입력합니다.
자, 이제 다음과 같은 파이썬 코드로 1만 개의 트윗을 받아와 tweet.txt 파일로 저장해보겠습니다. 당연한 이야기지만 HD인사이트 클러스터 환경에 파이썬이 이미 설치되어 있어야 합니다. 이제 단계별로 살펴보시죠.
우선 HD인사이트 클러스터에 접속합니다. 이때 SSH을 사용해 보안 접속을 합니다.
ssh USERNAME@CLUSTERNAME-ssh.azurehdinsight.net
그런 후 다음과 같은 명령으로 필요한 패키지를 다운받아 설치합니다.
-----------------------------------------------------------------------------------------------------------------------
sudo apt install python-dev libffi-dev libssl-dev
sudo apt remove python-openssl
pip install virtualenv
mkdir gettweets
cd gettweets
virtualenv gettweets
source gettweets/bin/activate
pip install tweepy progressbar pyOpenSSL requests[security]
-----------------------------------------------------------------------------------------------------------------------
다음 명령으로 gettweets.py 파일을 생성합니다.
nano gettweets.py
gettweets.py 파일의 컨텐츠로 다음과 같은 컨텐츠를 사용합니다.
-----------------------------------------------------------------------------------------------------------------------
#!/usr/bin/python
from tweepy import Stream, OAuthHandler
from tweepy.streaming import StreamListener
from progressbar import ProgressBar, Percentage, Bar
import json
import sys
#Twitter app information
consumer_secret='Your consumer secret'
consumer_key='Your consumer key'
access_token='Your access token'
access_token_secret='Your access token secret'
#The number of tweets we want to get
max_tweets=10000
#Create the listener class that receives and saves tweets
class listener(StreamListener):
#On init, set the counter to zero and create a progress bar
def __init__(self, api=None):
self.num_tweets = 0
self.pbar = ProgressBar(widgets=[Percentage(), Bar()], maxval=max_tweets).start()
#When data is received, do this
def on_data(self, data):
#Append the tweet to the 'tweets.txt' file
with open('tweets.txt', 'a') as tweet_file:
tweet_file.write(data)
#Increment the number of tweets
self.num_tweets += 1
#Check to see if we have hit max_tweets and exit if so
if self.num_tweets >= max_tweets:
self.pbar.finish()
sys.exit(0)
else:
#increment the progress bar
self.pbar.update(self.num_tweets)
return True
#Handle any errors that may occur
def on_error(self, status):
print status
#Get the OAuth token
auth = OAuthHandler(consumer_key, consumer_secret)
auth.set_access_token(access_token, access_token_secret)
#Use the listener class for stream processing
twitterStream = Stream(auth, listener())
#Filter for these topics
twitterStream.filter(track=["azure","cloud","hdinsight"])
-----------------------------------------------------------------------------------------------------------------------
위 내용을 복사해 붙어 넣은 후 파일을 저장하고, 다음 명령으로 트윗을 다운로드 하십시오.
python gettweets.py
이제, 파일을 받았으니 HD인사이트에 올리는 일만 남았습니다.
hdfs dfs -mkdir -p /tutorials/twitter/data
hdfs dfs -put tweets.txt /tutorials/twitter/data/tweets.txt
HiveQL 잡을 돌려 보겠습니다.
nano twitter.hql
다음 내용을 파일에 복사해 붙여 넣으십시오.
-----------------------------------------------------------------------------------------------------------------------
set hive.exec.dynamic.partition = true;
set hive.exec.dynamic.partition.mode = nonstrict;
-- Drop table, if it exists
DROP TABLE tweets_raw;
-- Create it, pointing toward the tweets logged from Twitter
CREATE EXTERNAL TABLE tweets_raw (
json_response STRING
)
STORED AS TEXTFILE LOCATION '/tutorials/twitter/data';
-- Drop and recreate the destination table
DROP TABLE tweets;
CREATE TABLE tweets
(
id BIGINT,
created_at STRING,
created_at_date STRING,
created_at_year STRING,
created_at_month STRING,
created_at_day STRING,
created_at_time STRING,
in_reply_to_user_id_str STRING,
text STRING,
contributors STRING,
retweeted STRING,
truncated STRING,
coordinates STRING,
source STRING,
retweet_count INT,
url STRING,
hashtags array<STRING>,
user_mentions array<STRING>,
first_hashtag STRING,
1. Use the following command to create a file containing HiveQL statements:
Use the following text as the contents of the file:
first_hashtag STRING,
first_user_mention STRING,
screen_name STRING,
name STRING,
followers_count INT,
listed_count INT,
friends_count INT,
lang STRING,
user_location STRING,
time_zone STRING,
profile_image_url STRING,
json_response STRING
);
-- Select tweets from the imported data, parse the JSON,
-- and insert into the tweets table
FROM tweets_raw
INSERT OVERWRITE TABLE tweets
SELECT
cast(get_json_object(json_response, '$.id_str') as BIGINT),
get_json_object(json_response, '$.created_at'),
concat(substr (get_json_object(json_response, '$.created_at'),1,10),' ',
substr (get_json_object(json_response, '$.created_at'),27,4)),
substr (get_json_object(json_response, '$.created_at'),27,4),
case substr (get_json_object(json_response, '$.created_at'),5,3)
when "Jan" then "01"
when "Feb" then "02"
when "Mar" then "03"
when "Apr" then "04"
when "May" then "05"
when "Jun" then "06"
when "Jul" then "07"
when "Aug" then "08"
when "Sep" then "09"
when "Oct" then "10"
when "Nov" then "11"
when "Dec" then "12" end,
substr (get_json_object(json_response, '$.created_at'),9,2),
substr (get_json_object(json_response, '$.created_at'),12,8),
get_json_object(json_response, '$.in_reply_to_user_id_str'),
get_json_object(json_response, '$.text'),
get_json_object(json_response, '$.contributors'),
get_json_object(json_response, '$.retweeted'),
get_json_object(json_response, '$.truncated'),
get_json_object(json_response, '$.coordinates'),
get_json_object(json_response, '$.source'),
cast (get_json_object(json_response, '$.retweet_count') as INT),
get_json_object(json_response, '$.entities.display_url'),
array(
trim(lower(get_json_object(json_response, '$.entities.hashtags[0].text'))),
trim(lower(get_json_object(json_response, '$.entities.hashtags[1].text'))),
trim(lower(get_json_object(json_response, '$.entities.hashtags[2].text'))),
trim(lower(get_json_object(json_response, '$.entities.hashtags[3].text'))),
trim(lower(get_json_object(json_response, '$.entities.hashtags[4].text')))),
array(
trim(lower(get_json_object(json_response, '$.entities.user_mentions[0].screen_name'))),
trim(lower(get_json_object(json_response, '$.entities.user_mentions[1].screen_name'))),
trim(lower(get_json_object(json_response, '$.entities.user_mentions[2].screen_name'))),
trim(lower(get_json_object(json_response, '$.entities.user_mentions[3].screen_name'))),
trim(lower(get_json_object(json_response, '$.entities.user_mentions[4].screen_name')))),
trim(lower(get_json_object(json_response, '$.entities.hashtags[0].text'))),
trim(lower(get_json_object(json_response, '$.entities.user_mentions[0].screen_name'))),
get_json_object(json_response, '$.user.screen_name'),
get_json_object(json_response, '$.user.name'),
cast (get_json_object(json_response, '$.user.followers_count') as INT),
cast (get_json_object(json_response, '$.user.listed_count') as INT),
cast (get_json_object(json_response, '$.user.friends_count') as INT),
get_json_object(json_response, '$.user.lang'),
get_json_object(json_response, '$.user.location'),
get_json_object(json_response, '$.user.time_zone'),
get_json_object(json_response, '$.user.profile_image_url'),
Next steps
get_json_object(json_response, '$.user.profile_image_url'),
json_response
WHERE (length(json_response) > 500);
-----------------------------------------------------------------------------------------------------------------------
다음 명령으로 HiveQL을 실행하십시오.
beeline -u 'jdbc:hive2://headnodehost:10001/;transportMode=http' -i twitter.hql
데이터가 잘 임포트 되었는지 다음과 같이 검증해 보십시오.
-----------------------------------------------------------------------------------------------------------------------
SELECT name, screen_name, count(1) as cc
FROM tweets
WHERE text like "%Azure%"
GROUP BY name,screen_name
ORDER BY cc DESC LIMIT 10;
-----------------------------------------------------------------------------------------------------------------------
간단하죠! 더 자세한 내용과 더 흥미로운 예제들은 가이드 문서를 통해 확인하기 바랍니다.
더 궁금한 사항은 락플레이스이 빅 데이터와 애저 전문가에게 문의 바랍니다.
*이 외에도 락플레이스의 최신 뉴스레터 소식(2021~) 을 받아보고 싶으시다면? *
*락플레이스 1:1 문의 바로가기*
락플레이스는 2005년에 출범하여 대한민국 리눅스의 역사와 성장을 함께 해온 오픈소스 전문 기업으로 국내 최다 엔지니어, 국내 최고 기술력, 국내외 유수기업 컨설팅의 풍부한 경험과 탄탄한 기술력으로 고객의 성공적인 오픈소스 플랫폼 구축과 응용 프로그램 전체 범위의 서비스를 제공하고 있습니다. 락플레이스는 레드햇코리아로부터 Elite Business Associate/Advanced Business Parter/채널 어워드 한국 탑 파트너/FY15 champion 등을 수상한 파트너며, Microsoft의 Microsoft Cloud Solution Partner(CSP)로서 함께하고 있습니다.
'OSS on Azure > 클라우드 운영 팁' 카테고리의 다른 글
정부, 지자체를 위한 최적의 클라우드의 모범을 제시한다! Azure Government (0) | 2021.03.22 |
---|---|
컨테이너 오케스트레이션의 제왕 쿠버네티스(Kubernetes)! 이제 Azure의 매니지드 서비스로 간편하게 사용 가능 (0) | 2021.03.18 |
애저 마켓플레이스 '추천 솔루션' - Azure에 올린 호튼웍스 HDP 클러스터 관리를 손쉽게 해주는 'Cloudbreak' (0) | 2021.03.18 |
스냅샷을 통한 복구 속도 개선에 대용량 디스크 백업 지원까지 ~ 애저에서 VM 백업과 복구 확실히 좋아졌다! (0) | 2021.03.18 |
중요 사이트와 서비스의 성능 보장 & 사용자 경험 관리 ~ 열일하는 Azure Traffic Manager (0) | 2021.03.18 |