본문 바로가기
DATA Science/DataEngineering

json 방식 parquet 방식

by Rainbound-IT 2021. 6. 27.
반응형
    # RDS - 아티스트 ID를 가져오고
    cursor.execute("SELECT id FROM artists LIMIT 10")

    dt = datetime.utcnow().strftime("%Y-%m-%d")
    print(dt)
    sys.exit(0)



    with open('top_tracks.json', 'w') as f:
        for i in top_tracks:
            json.dump(i,f)
            f.write(os.linesep)

    s3 = boto3.resource('s3')
    object = s3.Object('data-artsts', 'dt={}/top-tracks.json'.format(dt))
    #dt 읽을수 있는 파티션으로 바꿔준느것

이건 json 파일 형식으로

 

 

    # RDS - 아티스트 ID를 가져오고
    cursor.execute("SELECT id FROM artists LIMIT 10")

    dt = datetime.utcnow().strftime("%Y-%m-%d")
    print(dt)
    sys.exit(0)



    with open('top_tracks.json', 'w') as f:
        for i in top_tracks:
            json.dump(i,f)
            f.write(os.linesep)

    s3 = boto3.resource('s3')
    object = s3.Object('data-artsts', 'dt={}/top-tracks.json'.format(dt))
    #dt 읽을수 있는 파티션으로 바꿔준느것

 

이건 parquet 형식으로

 

 

        for i in raw['tracks']:
            top_track = {}
            for k, v in top_track_keys.items():
                top_track.update({k: jsonpath.jsonpath(i, v)})
                top_track.update({'artist_id': id})
                top_tracks.append(top_track)

트랙에 들어있는 id를 jsonpath 방식으로 빼오는건데 독특했다.

https://developer.spotify.com/documentation/web-api/reference/#category-tracks

 

Web API Reference | Spotify for Developers

Music, meet code. Powerful APIs, SDKs and widgets for simple and advanced applications.

developer.spotify.com

참조해서 보자

 

 

    #tracks batch
    tracks_batch = [track_ids[i: i+100] for i in range(0, len(track_ids), 100)]

    audio_features = []
    for i in tracks_batch:

        ids = ','.join(i)
        URL = "https://api.spotify.com/v1/audio-features/?ids={}".format(ids)

        r = requests.get(URL, headers=headers)
        raw = json.loads(r.text)

        audio_features.extend(raw['audio_features'])

    audio_features = pd.DataFrame(audio_features)
    audio_features.to_parquet('audio-features.parquet', engine='pyarrow', compression='snappy')
    
    s3 = boto3.resource('s3')
    object = s3.Object('spotify-artists', 'audio-features/dt={}/top-tracks.parquet'.format(dt))
    data = open('audio-features.parquet', 'rb')
    object.put(Body=data)    

 

여태까지 한거 한번에 보여주면 

 

def main():

    try:
        conn = pymysql.connect(host, user=username, passwd=password, db=database, port=port, use_unicode=True, charset='utf8')
        cursor = conn.cursor()
    except:
        logging.error("could not connect to rds")
        sys.exit(1)


    headers = get_headers(client_id, client_secret)

    # RDS - 아티스트 ID를 가져오고
    cursor.execute("SELECT id FROM artists LIMIT 10")

    top_track_keys = {
        "id": "id",
        "name": "name",
        "popularity": "popularity",
        "external_url": "external_urls.spotify"
    }

    # Top tracks spotify 가져오고
    top_tracks = []
    for (id, ) in cursor.fetchall():

        URL = "https://api.spotify.com/v1/artists/{}/top-tracks".format(id)
        params = {
            'country': 'US'
        }
        r = requests.get(URL, params=params, headers=headers)
        raw = json.loads(r.text)


        for i in raw['tracks']:
            top_track = {}
            for k, v in top_track_keys.items():
                top_track.update({k: jsonpath.jsonpath(i, v)})
                top_track.update({'artist_id': id})
                top_tracks.append(top_track)

    # list of dictionary로 바꿀것

    # track_ids
    track_ids = [i['id'][0] for i in top_tracks]




    top_tracks = pd.DataFrame(raw)
    #top_tracks 리스트를 만들엇다 가정
    top_tracks.to_parquet('top-tracks.parquet', engine='pyarrow',compression='snappy')

    dt = datetime.utcnow().strftime("%Y-%m-%d")

    s3 = boto3.resource('s3')
    object = s3.Object('data-artists', 'top-tracks/dt={}/top-tracks.parquet'.format(dt))
    data = open('top-tracks.parquet', 'rb')
    object.put(Body=data)
    #dt 읽을수 있는 파티션으로 바꿔준느것

    #tracks batch
    tracks_batch = [track_ids[i: i+100] for i in range(0, len(track_ids), 100)]

    audio_features = []
    for i in tracks_batch:

        ids = ','.join(i)
        URL = "https://api.spotify.com/v1/audio-features/?ids={}".format(ids)

        r = requests.get(URL, headers=headers)
        raw = json.loads(r.text)

        audio_features.extend(raw['audio_features'])

    audio_features = pd.DataFrame(audio_features)
    audio_features.to_parquet('audio-features.parquet', engine='pyarrow', compression='snappy')

    s3 = boto3.resource('s3')
    object = s3.Object('data-artists', 'audio-features/dt={}/top-tracks.parquet'.format(dt))
    data = open('audio-features.parquet', 'rb')
    object.put(Body=data)

이상한게 tracks에서 가져올때 struct 때문에 예전엔 안됏던것같은데

spotify에서 수정했는지 따로 define 안해도 바로 가져올수 있게 되었다.

그래서 어떤식으로 되어있어야 안되는지 예시를 볼수 없어서 조금 아쉬운 부분이었다.

반응형

댓글