본문 바로가기
CLOUD/AWS

[AWS SDK] sdk(maven) 를 이용하여 Athena query 하기

by Rainbound-IT 2022. 6. 23.
반응형

목차

    aws ahtena 직접 들어가서 보면 좋긴하겟는데

    쿼리를 웹같은 앱에서 직접 볼수 있도록하면 더좋아서 사용.

     

    1. 보안설정

    aws configure -proifle 을 이용하여 iam 유저를 로컬에서 등록시킵니다.

    (물론 권한이 Athena, s3가 있어야합니다.)

     

    2. maven 샘플 프로젝트 생성

    maven 프로젝트를 생성해줍니다(window에서 입니다. linux는 -D뒤에 ""를 빼주셔야합니다.

    mvn -B archetype:generate  -D"archetypeGroupId"=org.apache.maven.archetypes  -D"groupId"=athenaJ2Example  -D"artifactId"=athenaJ2Example

     

     

    3. 생성된 pom.xml 변경

    다음과 같이 생성된 pom.xml을 바꿔 줍니다.

    더보기
    <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0"
             xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
             xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
        <modelVersion>4.0.0</modelVersion>
        <groupId>athenaJ2Example</groupId>
        <artifactId>athenaJ2Example</artifactId>
        <version>1.0-SNAPSHOT</version>
        <properties>
            <maven.compiler.target>1.8</maven.compiler.target>
            <maven.compiler.source>1.8</maven.compiler.source>
        </properties>
        <build>
            <plugins>
                 <plugin>
                    <groupId>org.apache.maven.plugins</groupId>
                    <artifactId>maven-surefire-plugin</artifactId>
                    <version>2.22.1</version>
                </plugin>
            </plugins>
        </build>
        <dependencyManagement>
            <dependencies>
                <dependency>
                    <groupId>software.amazon.awssdk</groupId>
                    <artifactId>bom</artifactId>
                    <version>2.17.190</version>
                    <type>pom</type>
                    <scope>import</scope>
                </dependency>
            </dependencies>
        </dependencyManagement>
        <dependencies>
            <dependency>
                <groupId>org.junit.jupiter</groupId>
                <artifactId>junit-jupiter-api</artifactId>
                <version>5.8.2</version>
                <scope>test</scope>
            </dependency>
            <dependency>
                <groupId>org.junit.jupiter</groupId>
                <artifactId>junit-jupiter-engine</artifactId>
                <version>5.8.2</version>
                <scope>test</scope>
            </dependency>
            <dependency>
                <groupId>org.junit.platform</groupId>
                <artifactId>junit-platform-commons</artifactId>
                <version>1.8.2</version>
            </dependency>
            <dependency>
                <groupId>org.junit.platform</groupId>
                <artifactId>junit-platform-launcher</artifactId>
                <version>1.8.2</version>
                <scope>test</scope>
            </dependency>
             <dependency>
                <groupId>software.amazon.awssdk</groupId>
                <artifactId>athena</artifactId>
            </dependency>
        </dependencies>
    </project>

     

    4. Athena에 query할 코드를 app.java를 변경 및 test 코드 변경, config.properties 추가

     

    app.java 변경

    maven 프로젝트를 생성하면 

    athenaJ2Example/src/main/java/athena2Example/app.java가 있을 것입니다.

    저는 편의상 app 이름을 샘플과 맞추기 위하여 StartQueryExample.java로 변경하고 aws 샘플코드를 복붙하여 넣엇습니다.

    더보기
    /*
       Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
       SPDX-License-Identifier: Apache-2.0
    */
    
    package aws.example.athena;
    
    import software.amazon.awssdk.auth.credentials.ProfileCredentialsProvider;
    import software.amazon.awssdk.regions.Region;
    import software.amazon.awssdk.services.athena.AthenaClient;
    import software.amazon.awssdk.services.athena.model.QueryExecutionContext;
    import software.amazon.awssdk.services.athena.model.ResultConfiguration;
    import software.amazon.awssdk.services.athena.model.StartQueryExecutionRequest;
    import software.amazon.awssdk.services.athena.model.StartQueryExecutionResponse;
    import software.amazon.awssdk.services.athena.model.AthenaException;
    import software.amazon.awssdk.services.athena.model.GetQueryExecutionRequest;
    import software.amazon.awssdk.services.athena.model.GetQueryExecutionResponse;
    import software.amazon.awssdk.services.athena.model.QueryExecutionState;
    import software.amazon.awssdk.services.athena.model.GetQueryResultsRequest;
    import software.amazon.awssdk.services.athena.model.GetQueryResultsResponse;
    import software.amazon.awssdk.services.athena.model.ColumnInfo;
    import software.amazon.awssdk.services.athena.model.Row;
    import software.amazon.awssdk.services.athena.model.Datum;
    import software.amazon.awssdk.services.athena.paginators.GetQueryResultsIterable;
    import java.util.List;
    
    /**
     * Before running this Java V2 code example, set up your development environment, including your credentials.
     *
     * For more information, see the following documentation topic:
     *
     * https://docs.aws.amazon.com/sdk-for-java/latest/developer-guide/get-started.html
     */
    public class StartQueryExample {
    
        public static void main(String[] args) throws InterruptedException {
    
            AthenaClient athenaClient = AthenaClient.builder()
                    .region(Region.US_WEST_2)
                    .credentialsProvider(ProfileCredentialsProvider.create())
                    .build();
    
            String queryExecutionId = submitAthenaQuery(athenaClient);
            waitForQueryToComplete(athenaClient, queryExecutionId);
            processResultRows(athenaClient, queryExecutionId);
            athenaClient.close();
        }
    
        // Submits a sample query to Amazon Athena and returns the execution ID of the query.
        public static String submitAthenaQuery(AthenaClient athenaClient) {
    
            try {
    
                // The QueryExecutionContext allows us to set the database.
                QueryExecutionContext queryExecutionContext = QueryExecutionContext.builder()
                    .database(ExampleConstants.ATHENA_DEFAULT_DATABASE).build();
    
                // The result configuration specifies where the results of the query should go.
                ResultConfiguration resultConfiguration = ResultConfiguration.builder()
                        .outputLocation(ExampleConstants.ATHENA_OUTPUT_BUCKET)
                        .build();
    
                StartQueryExecutionRequest startQueryExecutionRequest = StartQueryExecutionRequest.builder()
                        .queryString(ExampleConstants.ATHENA_SAMPLE_QUERY)
                        .queryExecutionContext(queryExecutionContext)
                        .resultConfiguration(resultConfiguration)
                        .build();
    
                StartQueryExecutionResponse startQueryExecutionResponse = athenaClient.startQueryExecution(startQueryExecutionRequest);
                return startQueryExecutionResponse.queryExecutionId();
    
            } catch (AthenaException e) {
                e.printStackTrace();
                System.exit(1);
            }
            return "";
        }
    
        // Wait for an Amazon Athena query to complete, fail or to be cancelled.
        public static void waitForQueryToComplete(AthenaClient athenaClient, String queryExecutionId) throws InterruptedException {
            GetQueryExecutionRequest getQueryExecutionRequest = GetQueryExecutionRequest.builder()
                    .queryExecutionId(queryExecutionId).build();
    
            GetQueryExecutionResponse getQueryExecutionResponse;
            boolean isQueryStillRunning = true;
            while (isQueryStillRunning) {
                getQueryExecutionResponse = athenaClient.getQueryExecution(getQueryExecutionRequest);
                String queryState = getQueryExecutionResponse.queryExecution().status().state().toString();
                if (queryState.equals(QueryExecutionState.FAILED.toString())) {
                    throw new RuntimeException("The Amazon Athena query failed to run with error message: " + getQueryExecutionResponse
                            .queryExecution().status().stateChangeReason());
                } else if (queryState.equals(QueryExecutionState.CANCELLED.toString())) {
                    throw new RuntimeException("The Amazon Athena query was cancelled.");
                } else if (queryState.equals(QueryExecutionState.SUCCEEDED.toString())) {
                    isQueryStillRunning = false;
                } else {
                    // Sleep an amount of time before retrying again.
                    Thread.sleep(ExampleConstants.SLEEP_AMOUNT_IN_MS);
                }
                System.out.println("The current status is: " + queryState);
            }
        }
    
        // This code retrieves the results of a query
        public static void processResultRows(AthenaClient athenaClient, String queryExecutionId) {
    
           try {
    
               // Max Results can be set but if its not set,
               // it will choose the maximum page size.
                GetQueryResultsRequest getQueryResultsRequest = GetQueryResultsRequest.builder()
                        .queryExecutionId(queryExecutionId)
                        .build();
    
                GetQueryResultsIterable getQueryResultsResults = athenaClient.getQueryResultsPaginator(getQueryResultsRequest);
    
                for (GetQueryResultsResponse result : getQueryResultsResults) {
                    List<ColumnInfo> columnInfoList = result.resultSet().resultSetMetadata().columnInfo();
                    List<Row> results = result.resultSet().rows();
                    processRow(results, columnInfoList);
                }
    
            } catch (AthenaException e) {
               e.printStackTrace();
               System.exit(1);
           }
        }
    
        private static void processRow(List<Row> row, List<ColumnInfo> columnInfoList) {
    
            for (Row myRow : row) {
                List<Datum> allData = myRow.data();
                for (Datum data : allData) {
                    System.out.println("The value of the column is "+data.varCharValue());
                }
            }
        }
    }

     

    Testcode 변경

    이것도 app.java와 마찬가지로 

    athenaJ2Example/src/main/test/java/athena2Example/AppTest.java

    AppTest ->AmazonAthenaTest 로 변경후

    코드를 입력해줍니다.

    더보기
    import aws.example.athena.*;
    import org.junit.jupiter.api.*;
    import software.amazon.awssdk.auth.credentials.ProfileCredentialsProvider;
    import software.amazon.awssdk.regions.Region;
    import software.amazon.awssdk.services.athena.AthenaClient;
    
    import java.io.*;
    import java.util.Properties;
    
    import static org.junit.jupiter.api.Assertions.*;
    
    @TestInstance(TestInstance.Lifecycle.PER_METHOD)
    @TestMethodOrder(MethodOrderer.OrderAnnotation.class)
    public class AmazonAthenaTest {
    
        private static AthenaClient athenaClient;
        private static String nameQuery;
    
        @BeforeAll
        public static void setUp() throws IOException {
            athenaClient = AthenaClient.builder()
                    .region(Region.US_WEST_2)
                    .credentialsProvider(ProfileCredentialsProvider.create())
                    .build();
    
            try (InputStream input = AmazonAthenaTest.class.getClassLoader().getResourceAsStream("config.properties")) {
    
                Properties prop = new Properties();
    
                if (input == null) {
                    System.out.println("Sorry, unable to find config.properties");
                    return;
                }
    
                //load a properties file from class path, inside static method
                prop.load(input);
    
                // Populate the data members required for all tests
                nameQuery = prop.getProperty("nameQuery");
    
    
            } catch (IOException ex) {
                ex.printStackTrace();
            }
        }
        @Test
        @Order(1)
        public void whenInitializingAWSAthenaService_thenNotNull() {
            assertNotNull(athenaClient);
            System.out.println("Test 1 passed");
        }
    
        @Test
        @Order(2)
        public void CreateNamedQueryExample() {
    
           CreateNamedQueryExample.createNamedQuery(athenaClient, nameQuery);
           System.out.println("Test 2 passed");
        }
    
        @Test
        @Order(3)
        public void ListNamedQueryExample() {
    
           ListNamedQueryExample.listNamedQueries(athenaClient);
           System.out.println("Test 3 passed");
        }
    
        @Test
        @Order(4)
        public void ListQueryExecutionsExample() {
    
            ListQueryExecutionsExample.listQueryIds(athenaClient);
            System.out.println("Test 4 passed");
        }
    
        @Test
        @Order(5)
        public void DeleteNamedQueryExample()
        {
            String sampleNamedQueryId = DeleteNamedQueryExample.getNamedQueryId(athenaClient, nameQuery);
            DeleteNamedQueryExample.deleteQueryName(athenaClient, sampleNamedQueryId);
            System.out.println("Test 5 passed");
    
        }
    
    
        @Test
        @Order(6)
        public void StartQueryExample() {
    
            try {
                String queryExecutionId = StartQueryExample.submitAthenaQuery(athenaClient);
                StartQueryExample.waitForQueryToComplete(athenaClient, queryExecutionId);
                StartQueryExample.processResultRows(athenaClient, queryExecutionId);
                 System.out.println("Test 6 passed");
    
            }catch (InterruptedException e) {
             e.getMessage();
            }
        }
    
        @Test
        @Order(7)
        public void StopQueryExecutionExample() {
    
            String sampleQueryExecutionId = StopQueryExecutionExample.submitAthenaQuery(athenaClient);
            StopQueryExecutionExample.stopAthenaQuery(athenaClient, sampleQueryExecutionId);
            System.out.println("Test 7 passed");
        }
    }

    config.properties 추가

    test코드에서 사용하는것같은데 이것을 활용해서 여러 변수를 넣어줄수 있습니다.

    그러나 폴더 및 파일생성 후 간단하게 샘플코드처럼만 넣도록 하겠습니다.

    athenaJ2Example/src/main/resources/config/resources/config.properties

    nameQuery = sampleQuery

     

     

    5. Maven package

    이제 패키징 해볼 차례입니다.

    프로젝트 폴더 (athenaJ2Example) 로 들어가서 mvn package라고 명령어를 입력합니다.

    아래와 같은 에러가 발생합니다.

     

     

    non-static variable Exampleconstatns cannot be referenced from a static context symbol: variable ATHENA_DEFAULT_DATABASE(ATHENA_OUTPUT_BUCKET, ATHENA_SAMPLE_QUERY ) 에러

     에러가 발생합니다. 

    당연한 말이지만 지금 쿼리를 입력을 안했는데 무슨 결과 값이 나올까요? 나올게 없습니다.

    StartQueryExample.java 에 쿼리 및 DB값을 넣어줍니다.

     

    ATHENA_OUTPUT_BUCKET : 결과값 출력할 s3 주소

    ATHENA_SAMPLE_QUERY: query문 

    ATHENA_DEFAULT_DATABASE: query할 Athena database명

    /*
       Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
       SPDX-License-Identifier: Apache-2.0
    */
    
    package aws.example.athena;
    
    public class ExampleConstants {
    
        public static final int CLIENT_EXECUTION_TIMEOUT = 100000;
        public static final String ATHENA_OUTPUT_BUCKET = "s3://bucketscott2"; // change the Amazon S3 bucket name to match your environment
        //  Demonstrates how to query a table with a comma-separated value (CSV) table.  For information, see
        //https://docs.aws.amazon.com/athena/latest/ug/work-with-data.html
        public static final String ATHENA_SAMPLE_QUERY = "SELECT * FROM scott2;"; // change the Query statement to match your environment
        public static final long SLEEP_AMOUNT_IN_MS = 1000;
        public static final String ATHENA_DEFAULT_DATABASE = "mydatabase"; // change the database to match your database
    
    }

     

     

     

    위와 같이 수정한 후 mvn package 했는데 다음과 같은 에러가 발생합니다.

    java.lang.IllegalStateException: To use assumed roles in the [role NAME] profile, the 'sts' service module must be on the class path.

    제가 role을 사용해서 이런에러가 발생하는건지 잘 모르겠네요.

    이건 dependency 에러로 pom.xml에 다음을 추가합니다.

            <dependency>
                <groupId>software.amazon.awssdk</groupId>
                <artifactId>sts</artifactId>
                <version>2.15.20</version>
            </dependency>

    그리고 mvn package를 하면

    위와 같이 test를 거친후 빌드를 성공하는것을 볼수 있습니다.

     

     

    애플리케이션 실행

    mvn exec:java -D"exec.mainClass"="aws.example.athena.StartQueryExample"

    mvn exec:java -D"exec.mainClass"="aws.example.athena.StartQueryExample"

     

    test와 같은 결과를 얻을수 있고

     

    '';;;

    s3에 결과값이 저장된것을 볼수 있습니다.

     

     

     

    Reference

    aws sdk for java 시작하기

    https://docs.aws.amazon.com/ko_kr/sdk-for-java/latest/developer-guide/get-started.html

     

    를 시작합니다.AWS SDK for Java2.x - AWS SDK for Java

    당신은 것입니다.아니보안 액세스 키를 다운로드하거나 복사할 수 있는 다른 기회가 있습니다.

    docs.aws.amazon.com

     

    Ahtena 코드 샘플

    https://docs.aws.amazon.com/code-samples/latest/catalog/code-catalog-javav2-example_code-athena.html

     

    Java (SDK V2) Code Samples for Amazon Athena - AWS Code Sample

    Thanks for letting us know this page needs work. We're sorry we let you down. If you've got a moment, please tell us how we can make the documentation better.

    docs.aws.amazon.com

     

    the 'sts' service module must be on the class path 에러관련

    https://github.com/aws/aws-sdk-java-v2/issues/2123

     

    STS not on class path when using parallel stream · Issue #2123 · aws/aws-sdk-java-v2

    Running EcrClient.listImagesPaginator() in a Collection.parallelStream() using Spring Boot sometimes results in WebIdentityCredentialsUtils.factory() not finding STS on its class path. Describe the...

    github.com

     

     

    aws athena sample 설명

    https://readosapien.com/aws-athena-start-query-execution/

     

    반응형

    'CLOUD > AWS' 카테고리의 다른 글

    cloudformation stack status 상태설명 및 trouble shooting  (0) 2022.07.06
    AWS Athena조회 비용 관련  (3) 2022.06.29
    AWS Cloudwatch loggroup(aws-cli) 로컬에서 보는법  (0) 2022.06.23
    aws step function  (0) 2022.06.23
    AWS codestar  (2) 2022.06.22

    댓글