Apache Livy로 Spark Job 제출
  • PDF

Apache Livy로 Spark Job 제출

  • PDF

VPC 환경에서 이용 가능합니다.

Apache Livy는 REST 인터페이스를 이용하여 Spark 클러스터와 쉽게 상호작용할 수 있는 서비스입니다. 간단한 REST 인터페이스 또는 RPC 클라이언트 라이브러리를 통해 Spark Job 또는 Spark 코드 스니펫, 동기/비동기 결과 검색, Spark Context 관리를 쉽게 제출할 수 있습니다.

또한, Apache Livy는 Spark와 애플리케이션 서버 간의 상호작용을 단순화하여 대화형 웹/모바일 애플리케이션에 Spark를 사용할 수 있도록 도와줍니다.

  • 멀티 클라이언트에서 여러 개의 Spark Job을 사용할 수 있도록 Spark Context를 가지고 있습니다.
  • 멀티 Job 및 클라이언트에서 캐시된 RDD 또는 데이터 프레임을 공유합니다.
  • 멀티 Spark Context를 동시에 관리할 수 있으며, 우수한 내결함성과 동시성을 위해 Spark Context가 Livy Server 대신 클러스터(Yarn/Mesos)에서 실행됩니다.
  • Job은 미리 컴파일된 jar, 코드 스니펫 또는 java/scala 클라이언트 API를 통해 제출할 수 있습니다.
  • 보안 인증 통신을 이용하여 보안을 확보합니다.

hadoop-chadoop-use-ex9_1-1_ko

참고
  • Apache Livy에 대한 자세한 내용은 Apache Livy 홈페이지 를 참고해 주십시오.
  • 이미지 출처: https://livy.incubator.apache.org/assets/images/livy-architecture.png

이 가이드에서는 Cloud adoop Spark에서 제공하는 Apache Livy를 사용하여 Spark Job을 제출하는 방법을 설명합니다.

Python 모듈 설치

Spark 예제 코드 수행을 위해서는 먼저 requests라는 Python 모듈을 설치해 주십시오.

$ sudo yum install -y epel-release
$ sudo yum install -y python-pip
$ sudo pip install requests

Apache Livy 서버 정보 확인

Apache Livy 서버의 포트 정보는 Ambari UI에서 확인할 수 있습니다.

  1. Ambari UI에 접속한 후 Spark2 > CONFIGS를 차례대로 클릭해 주십시오.

hadoop-chadoop-use-ex9_2-1_ko

  1. Advanced livy2-conf 항목을 클릭한 후 livy.server.port 정보를 확인해 주십시오.

hadoop-chadoop-use-ex9_2-2_ko

Spark 예제 코드

예제 코드는 Apache Livy Examples를 참고하여 작성하였습니다.

  • 소스 코드 내용을 livy-test.py로 저장
#-*- coding:utf-8 -*-

import json, pprint, requests, textwrap, time, sys

# Livy2 접속 정보 입력
if len(sys.argv) < 2:
        print('ERROR : Livy 서버 접속 정보를 입력해 주세요')
        print(' - Usage: python {0} http://호스트명:포트'.format(sys.argv[0]))
        sys.exit(1)
host = sys.argv[1]

# 헤더 정보
headers = {'Content-Type': 'application/json'}

# Spark 세션 생성
data = {'kind': 'spark'}
r = requests.post(host + '/sessions', data=json.dumps(data), headers=headers)
print("Created " + r.headers['location'])

# Spark 세션 상태 확인
state = "notIdle"
session_url = host + r.headers['location']
sys.stdout.write('Waiting for session state to idle')
while state != 'idle':
        r = requests.get(session_url, headers=headers)
        state = r.json()['state']
        sys.stdout.write('.')
        sys.stdout.flush()
        time.sleep(1)
sys.stdout.write('\rSessioin State is Ready!!!!!!!!!!!!!!\n')
sys.stdout.flush()


# 테스트 코드 1
statements_url = session_url + '/statements'
data = {'code': '1 + 1'}
r = requests.post(statements_url, data=json.dumps(data), headers=headers)
statement_url = host + r.headers['location']
print('=' * 80)
print(statement_url)
print('Request: {0}'.format(data['code']))

output = None
while output == None:
        r = requests.get(statement_url, headers=headers)
        ret = r.json()
        if ret['output'] == None:
                time.sleep(1)
                continue
        if 'output' in ret and 'data' in ret['output']:
                output = ret['output']['data']['text/plain']

print('-' * 80)
print(output)

# 테스트 코드 2
data = {
        'code': textwrap.dedent("""
                val NUM_SAMPLES = 100000;
                val count = sc.parallelize(1 to NUM_SAMPLES).map { i =>
                        val x = Math.random();
                        val y = Math.random();
                        if (x*x + y*y < 1) 1 else 0
                }.reduce(_ + _);
                println(\"Pi is roughly \" + 4.0 * count / NUM_SAMPLES)
                """)
}

r = requests.post(statements_url, data=json.dumps(data), headers=headers)
statement_url = host + r.headers['location']
print('=' * 80)
print(statement_url)
print('Request: {0}'.format(data['code']))

output = None
while output == None:
        r = requests.get(statement_url, headers=headers)
        ret = r.json()
        if ret['output'] == None:
                time.sleep(1)
                continue
        if 'output' in ret and 'data' in ret['output']:
                output = ret['output']['data']['text/plain']

print('-' * 80)
print(output)

# Spark 세션 종료
print('=' * 80)
r = requests.delete(session_url, headers=headers)
print('{0} {1}'.format(r.json()['msg'], session_url))

예제 코드인 livy-test.py 수행 시에는 아래와 같이 Livy 서버 접속 정보(http://ip:port)를 인자 값으로 입력해 주어야 합니다.

$ python livy-test.py http://ip:port

사용 방법은 아래와 같습니다.

$ python livy-test.py http://172.16.3.22:8999
Created /sessions/47
Sessioin State is Ready!!!!!!!!!!!!!!...........................
================================================================================
http://172.16.3.22:8999/sessions/47/statements/0
Request: 1 + 1
--------------------------------------------------------------------------------
res0: Int = 2================================================================================
http://172.16.3.22:8999/sessions/47/statements/1
Request:
val NUM_SAMPLES = 100000;
val count = sc.parallelize(1 to NUM_SAMPLES).map { i =>
        val x = Math.random();
        val y = Math.random();
        if (x*x + y*y < 1) 1 else 0
}.reduce(_ + _);
println("Pi is roughly " + 4.0 * count / NUM_SAMPLES)--------------------------------------------------------------------------------
NUM_SAMPLES: Int = 100000
count: Int = 78503
Pi is roughly 3.14012================================================================================
deleted http://172.16.3.22:8999/sessions/47

이 글이 도움이 되었나요?

What's Next