본문 바로가기
CLOUD/AWS

[AWS SDK] sdk(maven) 를 이용하여 Athena query 하기

by Rainbound-IT 2022. 6. 23.
반응형

aws ahtena 직접 들어가서 보면 좋긴하겟는데

쿼리를 웹같은 앱에서 직접 볼수 있도록하면 더좋아서 사용.

 

1. 보안설정

aws configure -proifle 을 이용하여 iam 유저를 로컬에서 등록시킵니다.

(물론 권한이 Athena, s3가 있어야합니다.)

 

2. maven 샘플 프로젝트 생성

maven 프로젝트를 생성해줍니다(window에서 입니다. linux는 -D뒤에 ""를 빼주셔야합니다.

mvn -B archetype:generate  -D"archetypeGroupId"=org.apache.maven.archetypes  -D"groupId"=athenaJ2Example  -D"artifactId"=athenaJ2Example

 

 

3. 생성된 pom.xml 변경

다음과 같이 생성된 pom.xml을 바꿔 줍니다.

더보기
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <groupId>athenaJ2Example</groupId>
    <artifactId>athenaJ2Example</artifactId>
    <version>1.0-SNAPSHOT</version>
    <properties>
        <maven.compiler.target>1.8</maven.compiler.target>
        <maven.compiler.source>1.8</maven.compiler.source>
    </properties>
    <build>
        <plugins>
             <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-surefire-plugin</artifactId>
                <version>2.22.1</version>
            </plugin>
        </plugins>
    </build>
    <dependencyManagement>
        <dependencies>
            <dependency>
                <groupId>software.amazon.awssdk</groupId>
                <artifactId>bom</artifactId>
                <version>2.17.190</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>
        </dependencies>
    </dependencyManagement>
    <dependencies>
        <dependency>
            <groupId>org.junit.jupiter</groupId>
            <artifactId>junit-jupiter-api</artifactId>
            <version>5.8.2</version>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.junit.jupiter</groupId>
            <artifactId>junit-jupiter-engine</artifactId>
            <version>5.8.2</version>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.junit.platform</groupId>
            <artifactId>junit-platform-commons</artifactId>
            <version>1.8.2</version>
        </dependency>
        <dependency>
            <groupId>org.junit.platform</groupId>
            <artifactId>junit-platform-launcher</artifactId>
            <version>1.8.2</version>
            <scope>test</scope>
        </dependency>
         <dependency>
            <groupId>software.amazon.awssdk</groupId>
            <artifactId>athena</artifactId>
        </dependency>
    </dependencies>
</project>

 

4. Athena에 query할 코드를 app.java를 변경 및 test 코드 변경, config.properties 추가

 

app.java 변경

maven 프로젝트를 생성하면 

athenaJ2Example/src/main/java/athena2Example/app.java가 있을 것입니다.

저는 편의상 app 이름을 샘플과 맞추기 위하여 StartQueryExample.java로 변경하고 aws 샘플코드를 복붙하여 넣엇습니다.

더보기
/*
   Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
   SPDX-License-Identifier: Apache-2.0
*/

package aws.example.athena;

import software.amazon.awssdk.auth.credentials.ProfileCredentialsProvider;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.athena.AthenaClient;
import software.amazon.awssdk.services.athena.model.QueryExecutionContext;
import software.amazon.awssdk.services.athena.model.ResultConfiguration;
import software.amazon.awssdk.services.athena.model.StartQueryExecutionRequest;
import software.amazon.awssdk.services.athena.model.StartQueryExecutionResponse;
import software.amazon.awssdk.services.athena.model.AthenaException;
import software.amazon.awssdk.services.athena.model.GetQueryExecutionRequest;
import software.amazon.awssdk.services.athena.model.GetQueryExecutionResponse;
import software.amazon.awssdk.services.athena.model.QueryExecutionState;
import software.amazon.awssdk.services.athena.model.GetQueryResultsRequest;
import software.amazon.awssdk.services.athena.model.GetQueryResultsResponse;
import software.amazon.awssdk.services.athena.model.ColumnInfo;
import software.amazon.awssdk.services.athena.model.Row;
import software.amazon.awssdk.services.athena.model.Datum;
import software.amazon.awssdk.services.athena.paginators.GetQueryResultsIterable;
import java.util.List;

/**
 * Before running this Java V2 code example, set up your development environment, including your credentials.
 *
 * For more information, see the following documentation topic:
 *
 * https://docs.aws.amazon.com/sdk-for-java/latest/developer-guide/get-started.html
 */
public class StartQueryExample {

    public static void main(String[] args) throws InterruptedException {

        AthenaClient athenaClient = AthenaClient.builder()
                .region(Region.US_WEST_2)
                .credentialsProvider(ProfileCredentialsProvider.create())
                .build();

        String queryExecutionId = submitAthenaQuery(athenaClient);
        waitForQueryToComplete(athenaClient, queryExecutionId);
        processResultRows(athenaClient, queryExecutionId);
        athenaClient.close();
    }

    // Submits a sample query to Amazon Athena and returns the execution ID of the query.
    public static String submitAthenaQuery(AthenaClient athenaClient) {

        try {

            // The QueryExecutionContext allows us to set the database.
            QueryExecutionContext queryExecutionContext = QueryExecutionContext.builder()
                .database(ExampleConstants.ATHENA_DEFAULT_DATABASE).build();

            // The result configuration specifies where the results of the query should go.
            ResultConfiguration resultConfiguration = ResultConfiguration.builder()
                    .outputLocation(ExampleConstants.ATHENA_OUTPUT_BUCKET)
                    .build();

            StartQueryExecutionRequest startQueryExecutionRequest = StartQueryExecutionRequest.builder()
                    .queryString(ExampleConstants.ATHENA_SAMPLE_QUERY)
                    .queryExecutionContext(queryExecutionContext)
                    .resultConfiguration(resultConfiguration)
                    .build();

            StartQueryExecutionResponse startQueryExecutionResponse = athenaClient.startQueryExecution(startQueryExecutionRequest);
            return startQueryExecutionResponse.queryExecutionId();

        } catch (AthenaException e) {
            e.printStackTrace();
            System.exit(1);
        }
        return "";
    }

    // Wait for an Amazon Athena query to complete, fail or to be cancelled.
    public static void waitForQueryToComplete(AthenaClient athenaClient, String queryExecutionId) throws InterruptedException {
        GetQueryExecutionRequest getQueryExecutionRequest = GetQueryExecutionRequest.builder()
                .queryExecutionId(queryExecutionId).build();

        GetQueryExecutionResponse getQueryExecutionResponse;
        boolean isQueryStillRunning = true;
        while (isQueryStillRunning) {
            getQueryExecutionResponse = athenaClient.getQueryExecution(getQueryExecutionRequest);
            String queryState = getQueryExecutionResponse.queryExecution().status().state().toString();
            if (queryState.equals(QueryExecutionState.FAILED.toString())) {
                throw new RuntimeException("The Amazon Athena query failed to run with error message: " + getQueryExecutionResponse
                        .queryExecution().status().stateChangeReason());
            } else if (queryState.equals(QueryExecutionState.CANCELLED.toString())) {
                throw new RuntimeException("The Amazon Athena query was cancelled.");
            } else if (queryState.equals(QueryExecutionState.SUCCEEDED.toString())) {
                isQueryStillRunning = false;
            } else {
                // Sleep an amount of time before retrying again.
                Thread.sleep(ExampleConstants.SLEEP_AMOUNT_IN_MS);
            }
            System.out.println("The current status is: " + queryState);
        }
    }

    // This code retrieves the results of a query
    public static void processResultRows(AthenaClient athenaClient, String queryExecutionId) {

       try {

           // Max Results can be set but if its not set,
           // it will choose the maximum page size.
            GetQueryResultsRequest getQueryResultsRequest = GetQueryResultsRequest.builder()
                    .queryExecutionId(queryExecutionId)
                    .build();

            GetQueryResultsIterable getQueryResultsResults = athenaClient.getQueryResultsPaginator(getQueryResultsRequest);

            for (GetQueryResultsResponse result : getQueryResultsResults) {
                List<ColumnInfo> columnInfoList = result.resultSet().resultSetMetadata().columnInfo();
                List<Row> results = result.resultSet().rows();
                processRow(results, columnInfoList);
            }

        } catch (AthenaException e) {
           e.printStackTrace();
           System.exit(1);
       }
    }

    private static void processRow(List<Row> row, List<ColumnInfo> columnInfoList) {

        for (Row myRow : row) {
            List<Datum> allData = myRow.data();
            for (Datum data : allData) {
                System.out.println("The value of the column is "+data.varCharValue());
            }
        }
    }
}

 

Testcode 변경

이것도 app.java와 마찬가지로 

athenaJ2Example/src/main/test/java/athena2Example/AppTest.java

AppTest ->AmazonAthenaTest 로 변경후

코드를 입력해줍니다.

더보기
import aws.example.athena.*;
import org.junit.jupiter.api.*;
import software.amazon.awssdk.auth.credentials.ProfileCredentialsProvider;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.athena.AthenaClient;

import java.io.*;
import java.util.Properties;

import static org.junit.jupiter.api.Assertions.*;

@TestInstance(TestInstance.Lifecycle.PER_METHOD)
@TestMethodOrder(MethodOrderer.OrderAnnotation.class)
public class AmazonAthenaTest {

    private static AthenaClient athenaClient;
    private static String nameQuery;

    @BeforeAll
    public static void setUp() throws IOException {
        athenaClient = AthenaClient.builder()
                .region(Region.US_WEST_2)
                .credentialsProvider(ProfileCredentialsProvider.create())
                .build();

        try (InputStream input = AmazonAthenaTest.class.getClassLoader().getResourceAsStream("config.properties")) {

            Properties prop = new Properties();

            if (input == null) {
                System.out.println("Sorry, unable to find config.properties");
                return;
            }

            //load a properties file from class path, inside static method
            prop.load(input);

            // Populate the data members required for all tests
            nameQuery = prop.getProperty("nameQuery");


        } catch (IOException ex) {
            ex.printStackTrace();
        }
    }
    @Test
    @Order(1)
    public void whenInitializingAWSAthenaService_thenNotNull() {
        assertNotNull(athenaClient);
        System.out.println("Test 1 passed");
    }

    @Test
    @Order(2)
    public void CreateNamedQueryExample() {

       CreateNamedQueryExample.createNamedQuery(athenaClient, nameQuery);
       System.out.println("Test 2 passed");
    }

    @Test
    @Order(3)
    public void ListNamedQueryExample() {

       ListNamedQueryExample.listNamedQueries(athenaClient);
       System.out.println("Test 3 passed");
    }

    @Test
    @Order(4)
    public void ListQueryExecutionsExample() {

        ListQueryExecutionsExample.listQueryIds(athenaClient);
        System.out.println("Test 4 passed");
    }

    @Test
    @Order(5)
    public void DeleteNamedQueryExample()
    {
        String sampleNamedQueryId = DeleteNamedQueryExample.getNamedQueryId(athenaClient, nameQuery);
        DeleteNamedQueryExample.deleteQueryName(athenaClient, sampleNamedQueryId);
        System.out.println("Test 5 passed");

    }


    @Test
    @Order(6)
    public void StartQueryExample() {

        try {
            String queryExecutionId = StartQueryExample.submitAthenaQuery(athenaClient);
            StartQueryExample.waitForQueryToComplete(athenaClient, queryExecutionId);
            StartQueryExample.processResultRows(athenaClient, queryExecutionId);
             System.out.println("Test 6 passed");

        }catch (InterruptedException e) {
         e.getMessage();
        }
    }

    @Test
    @Order(7)
    public void StopQueryExecutionExample() {

        String sampleQueryExecutionId = StopQueryExecutionExample.submitAthenaQuery(athenaClient);
        StopQueryExecutionExample.stopAthenaQuery(athenaClient, sampleQueryExecutionId);
        System.out.println("Test 7 passed");
    }
}

config.properties 추가

test코드에서 사용하는것같은데 이것을 활용해서 여러 변수를 넣어줄수 있습니다.

그러나 폴더 및 파일생성 후 간단하게 샘플코드처럼만 넣도록 하겠습니다.

athenaJ2Example/src/main/resources/config/resources/config.properties

nameQuery = sampleQuery

 

 

5. Maven package

이제 패키징 해볼 차례입니다.

프로젝트 폴더 (athenaJ2Example) 로 들어가서 mvn package라고 명령어를 입력합니다.

아래와 같은 에러가 발생합니다.

 

 

non-static variable Exampleconstatns cannot be referenced from a static context symbol: variable ATHENA_DEFAULT_DATABASE(ATHENA_OUTPUT_BUCKET, ATHENA_SAMPLE_QUERY ) 에러

 에러가 발생합니다. 

당연한 말이지만 지금 쿼리를 입력을 안했는데 무슨 결과 값이 나올까요? 나올게 없습니다.

StartQueryExample.java 에 쿼리 및 DB값을 넣어줍니다.

 

ATHENA_OUTPUT_BUCKET : 결과값 출력할 s3 주소

ATHENA_SAMPLE_QUERY: query문 

ATHENA_DEFAULT_DATABASE: query할 Athena database명

/*
   Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
   SPDX-License-Identifier: Apache-2.0
*/

package aws.example.athena;

public class ExampleConstants {

    public static final int CLIENT_EXECUTION_TIMEOUT = 100000;
    public static final String ATHENA_OUTPUT_BUCKET = "s3://bucketscott2"; // change the Amazon S3 bucket name to match your environment
    //  Demonstrates how to query a table with a comma-separated value (CSV) table.  For information, see
    //https://docs.aws.amazon.com/athena/latest/ug/work-with-data.html
    public static final String ATHENA_SAMPLE_QUERY = "SELECT * FROM scott2;"; // change the Query statement to match your environment
    public static final long SLEEP_AMOUNT_IN_MS = 1000;
    public static final String ATHENA_DEFAULT_DATABASE = "mydatabase"; // change the database to match your database

}

 

 

 

위와 같이 수정한 후 mvn package 했는데 다음과 같은 에러가 발생합니다.

java.lang.IllegalStateException: To use assumed roles in the [role NAME] profile, the 'sts' service module must be on the class path.

제가 role을 사용해서 이런에러가 발생하는건지 잘 모르겠네요.

이건 dependency 에러로 pom.xml에 다음을 추가합니다.

        <dependency>
            <groupId>software.amazon.awssdk</groupId>
            <artifactId>sts</artifactId>
            <version>2.15.20</version>
        </dependency>

그리고 mvn package를 하면

위와 같이 test를 거친후 빌드를 성공하는것을 볼수 있습니다.

 

 

애플리케이션 실행

mvn exec:java -D"exec.mainClass"="aws.example.athena.StartQueryExample"

mvn exec:java -D"exec.mainClass"="aws.example.athena.StartQueryExample"

 

test와 같은 결과를 얻을수 있고

 

'';;;

s3에 결과값이 저장된것을 볼수 있습니다.

 

 

 

Reference

aws sdk for java 시작하기

https://docs.aws.amazon.com/ko_kr/sdk-for-java/latest/developer-guide/get-started.html

 

를 시작합니다.AWS SDK for Java2.x - AWS SDK for Java

당신은 것입니다.아니보안 액세스 키를 다운로드하거나 복사할 수 있는 다른 기회가 있습니다.

docs.aws.amazon.com

 

Ahtena 코드 샘플

https://docs.aws.amazon.com/code-samples/latest/catalog/code-catalog-javav2-example_code-athena.html

 

Java (SDK V2) Code Samples for Amazon Athena - AWS Code Sample

Thanks for letting us know this page needs work. We're sorry we let you down. If you've got a moment, please tell us how we can make the documentation better.

docs.aws.amazon.com

 

the 'sts' service module must be on the class path 에러관련

https://github.com/aws/aws-sdk-java-v2/issues/2123

 

STS not on class path when using parallel stream · Issue #2123 · aws/aws-sdk-java-v2

Running EcrClient.listImagesPaginator() in a Collection.parallelStream() using Spring Boot sometimes results in WebIdentityCredentialsUtils.factory() not finding STS on its class path. Describe the...

github.com

 

 

aws athena sample 설명

https://readosapien.com/aws-athena-start-query-execution/

 

반응형

'CLOUD > AWS' 카테고리의 다른 글

cloudformation stack status 상태설명 및 trouble shooting  (0) 2022.07.06
AWS Athena조회 비용 관련  (3) 2022.06.29
AWS Cloudwatch loggroup(aws-cli) 로컬에서 보는법  (0) 2022.06.23
aws step function  (0) 2022.06.23
AWS codestar  (2) 2022.06.22

댓글