Apache Livy로 Spark Job 제출
    • PDF

    Apache Livy로 Spark Job 제출

    • PDF

    Article Summary

    Apache Livy는 REST 인터페이스를 이용하여 Spark 클러스터와 쉽게 상호작용할 수 있는 서비스입니다. 간단한 REST 인터페이스 또는 RPC(Remote Procedure Call) 클라이언트 라이브러리를 통하여 Spark Job 또는 Spark 코드 스니펫, 동기/비동기 결과 검색, SparkContext 관리를 쉽게 제출할 수 있습니다.

    또한, Apache Livy는 Spark와 애플리케이션 서버 간 상호작용을 단순화하여, 대화형 웹/모바일 애플리케이션에서 Spark를 사용할 수 있도록 도와줍니다.

    • 멀티 클라이언트에서 여러 개의 Spark Job을 사용할 수 있도록 SparkContext를 가지고 있습니다.
    • 멀티 Job 및 클라이언트에서 캐시된 RDD(Resilient Distributed Dataset) 또는 데이터 프레임을 공유합니다.
    • 멀티 SparkContext를 동시에 관리할 수 있습니다. 우수한 내결함성과 동시성을 위해 SparkContext가 Livy 서버 대신 클러스터(YARN/Mesos)에서 실행됩니다.
    • 미리 컴파일된 jar, 코드 스니펫 또는 Java/Scala 클라이언트 API를 통해 Job을 제출할 수 있습니다.
    • 보안 인증 통신을 이용하여 보안을 확보합니다.

    hadoop-chadoop-use-ex9_1-1

    참고

    이 가이드에서는 Cloud Hadoop에서 제공하는 Apache Livy를 사용하여 Spark Job을 제출하는 방법을 설명합니다.

    Python 모듈 설치

    Spark 예제 코드 수행을 위해서는 먼저 requests라는 Python 모듈을 설치해 주십시오.

    $ sudo yum install -y epel-release
    $ sudo yum install -y python-pip
    $ sudo pip install requests
    

    Apache Livy 서버 정보 확인

    Apache Livy 서버의 포트 정보는 Ambari UI에서 확인할 수 있습니다.

    1. Ambari UI에 접속한 후 Spark2 > [CONFIGS] 를 차례대로 클릭해 주십시오.
      hadoop-chadoop-use-ex9_2-1_ko

    2. Advanced livy2-conf 항목을 클릭한 후 livy.server.port 정보를 확인해 주십시오.
      hadoop-chadoop-use-ex9_2-2_ko

    Spark 예제 코드

    예제 코드는 Apache Livy Examples를 참고하여 작성하였습니다.

    • 소스코드 내용을 livy-test.py로 저장
    #-*- coding:utf-8 -*-
    
    import json, pprint, requests, textwrap, time, sys
    
    # Livy2 접속 정보 입력
    if len(sys.argv) < 2:
            print('ERROR : Livy 서버 접속 정보를 입력해 주세요')
            print(' - Usage: python {0} http://호스트명:포트'.format(sys.argv[0]))
            sys.exit(1)
    host = sys.argv[1]
    
    # 헤더 정보
    headers = {'Content-Type': 'application/json'}
    
    # Spark 세션 생성
    data = {'kind': 'spark'}
    r = requests.post(host + '/sessions', data=json.dumps(data), headers=headers)
    print("Created " + r.headers['location'])
    
    # Spark 세션 상태 확인
    state = "notIdle"
    session_url = host + r.headers['location']
    sys.stdout.write('Waiting for session state to idle')
    while state != 'idle':
            r = requests.get(session_url, headers=headers)
            state = r.json()['state']
            sys.stdout.write('.')
            sys.stdout.flush()
            time.sleep(1)
    sys.stdout.write('\rSessioin State is Ready!!!!!!!!!!!!!!\n')
    sys.stdout.flush()
    
    
    # 테스트 코드 1
    statements_url = session_url + '/statements'
    data = {'code': '1 + 1'}
    r = requests.post(statements_url, data=json.dumps(data), headers=headers)
    statement_url = host + r.headers['location']
    print('=' * 80)
    print(statement_url)
    print('Request: {0}'.format(data['code']))
    
    output = None
    while output == None:
            r = requests.get(statement_url, headers=headers)
            ret = r.json()
            if ret['output'] == None:
                    time.sleep(1)
                    continue
            if 'output' in ret and 'data' in ret['output']:
                    output = ret['output']['data']['text/plain']
    
    print('-' * 80)
    print(output)
    
    # 테스트 코드 2
    data = {
            'code': textwrap.dedent("""
                    val NUM_SAMPLES = 100000;
                    val count = sc.parallelize(1 to NUM_SAMPLES).map { i =>
                            val x = Math.random();
                            val y = Math.random();
                            if (x*x + y*y < 1) 1 else 0
                    }.reduce(_ + _);
                    println(\"Pi is roughly \" + 4.0 * count / NUM_SAMPLES)
                    """)
    }
    
    r = requests.post(statements_url, data=json.dumps(data), headers=headers)
    statement_url = host + r.headers['location']
    print('=' * 80)
    print(statement_url)
    print('Request: {0}'.format(data['code']))
    
    output = None
    while output == None:
            r = requests.get(statement_url, headers=headers)
            ret = r.json()
            if ret['output'] == None:
                    time.sleep(1)
                    continue
            if 'output' in ret and 'data' in ret['output']:
                    output = ret['output']['data']['text/plain']
    
    print('-' * 80)
    print(output)
    
    # Spark 세션 종료
    print('=' * 80)
    r = requests.delete(session_url, headers=headers)
    print('{0} {1}'.format(r.json()['msg'], session_url))
    

    예제 코드인 livy-test.py 수행 시 아래와 같이 Livy 서버 접속 정보(http://ip:port)를 인자 값으로 입력해 주어야 합니다.

    $ python livy-test.py http://ip:port
    

    사용 방법은 다음과 같습니다.

    $ python livy-test.py http://172.16.3.22:8999
    Created /sessions/47
    Sessioin State is Ready!!!!!!!!!!!!!!...........................
    ================================================================================
    http://172.16.3.22:8999/sessions/47/statements/0
    Request: 1 + 1
    --------------------------------------------------------------------------------
    res0: Int = 2================================================================================
    http://172.16.3.22:8999/sessions/47/statements/1
    Request:
    val NUM_SAMPLES = 100000;
    val count = sc.parallelize(1 to NUM_SAMPLES).map { i =>
            val x = Math.random();
            val y = Math.random();
            if (x*x + y*y < 1) 1 else 0
    }.reduce(_ + _);
    println("Pi is roughly " + 4.0 * count / NUM_SAMPLES)--------------------------------------------------------------------------------
    NUM_SAMPLES: Int = 100000
    count: Int = 78503
    Pi is roughly 3.14012================================================================================
    deleted http://172.16.3.22:8999/sessions/47
    

    이 문서가 도움이 되었습니까?

    Changing your password will log you out immediately. Use the new password to log back in.
    First name must have atleast 2 characters. Numbers and special characters are not allowed.
    Last name must have atleast 1 characters. Numbers and special characters are not allowed.
    Enter a valid email
    Enter a valid password
    Your profile has been successfully updated.