핸즈온머신러닝&딥러닝

RNN과 어텐션을 사용한 자연어 처리 1

threegopark 2021. 5. 2. 12:10
728x90

자연어 문제를 위해 많이 사용하는 방법은 순환 신경망이다. 이번 포스팅부터는 문장에서 다음 글자를 예측하도록 훈련하는 문자 단위 RNN에 대해서 살펴보고, 새로운 텍스트를 생성하고 그 과정에서 매우 긴 시퀀스를 가진 텐서플로 데이터셋을 만들어볼 것이다.

 

 

Char-RNN을 사용해 셰익스피어 문체 생성하기

 

Char-RNN 모델은 한 번에 한 글자씩 새로운 텍스트를 생성할 수 있는 RNN 모델이다. 이 모델을 활용하여 단어, 문법, 구두점 등을 학습시켜 셰익스피어의 문체를 가진 텍스트를 생성해볼 것이다.

 

1. 훈련 데이터셋 만들기

 

import tensorflow as tf
from tensorflow import keras
import numpy as np

shakespeare_url = tf.keras.utils.get_file('shakespeare.txt', 
					'https://storage.googleapis.com/download.tensorflow.org/data/shakespeare.txt')
with open(shakespeare_url) as f:
	shakespeare_text = f.read()

케라스의 get_file 메서드를 이용하여 셰익스피어 작품을 모두 다운로드한다.

 

 

그다음 모든 글자를 정수로 인코딩한다. Tokenizer 클래스를 사용하였으며 텍스트에서 사용되는 모든 글자를 찾아 각기 다른 글자 ID에 맵핑하는 과정이다. (ID는 1부터 시작해 고유한 글자 개수까지 만들어진다.)

tokenizer = keras.preprocessing.text.Tokenizer(char_level = True)
tokenizer.fit_on_texts(shakespeare_text)

여기서 char_level = True로 지정하여 단어 수준 인코딩 대신 글자 수준 인코딩을 만든다. 이 클래스는 기본적으로 모든 텍스트를 소문자로 바꿔준다. (이를 원하지 않을 경우 lower = False 로 지정한다.)

 

그러면 위의 과정을 통해 특정 텍스트가 어떻게 변환되었는지 확인해보자.

tokenizer.texts_to_sequences(["coding test"])

글자글자마다 하나의 숫자 ID로 맵핑되었다. 그렇다면 반대로 해당 숫자를 입력하면 coding test가 출력되는지 확인해보자. (마스킹에 사용되기 때문에 1~39까지의 숫자가 사용되었다.)

tokenizer.sequences_to_texts([[19, 4, 13, 6, 10, 21, 1, 3, 2, 8, 3]])

 

내려받은 셰익스피어 작품에 몇 개의 고유 글자와 전체 글자가 있는지 확인해보자.

max_id = len(tokenizer.word_index)
print(max_id)

dataset_size = tokenizer.document_count
print(dataset_size)

전체 텍스트를 인코딩하여 각 글자를 ID값으로 변환시킨다. 여기서 앞서 마스킹때문에 숫자 ID가 1부터 시작되었다고 말했는데 이를 0부터 시작하여 38까지의 ID 값을 매핑할 수 잇도록 변환시킬 것이다.

[encoded] = np.array(tokenizer.texts_to_sequences([shakespeare_text])) - 1

 

 

2. 순차 데이터셋을 나누는 방법

 

훈련 세트, 검증 세트, 테스트 세트가 중복되지 않도록 만드는 것은 매우 중요하다. 그렇다면 시계열 데이터를 다룰 경우 데이터의 중복이 없도록 하기 위해서는 어떻게 해야할까?

 

일반적으로 시계열을 다룰 때에는 과거 훈련 세트에서 학습하는 패턴이 미래에도 비슷하게 등장한다고 가정하기 때문에 (즉, 과거의 패턴이 변하지 않는다고 가정하기 때문에) 데이터셋을 시간에 따라 나눈다. 하지만 주식데이터 등 매우 민감한 데이터도 많이 존재하기 때문에 시계열 데이터셋을 나눌때는 해당 데이터가 충분히 안정적인지 확인할 필요가 있다.

보통 시간에 따라 검증 세트에 대한 모델의 오차를 통해 이를 확인하는데, 모델이 검증 세트 마지막보다 첫 부분에서 성능이 더 좋다면 해당 시계열은 안정적이지 않다고 판단한다. 그리고 이러한 경우에는 짧은 시간 간격으로 모델을 훈련한다.

 

셰익스피어 데이터같은 경우 주식 데이터처럼 특수한 경우가 아니기 때문에 처음 90%를 훈련에, 나머지는 검증과 테스트에 사용한다.

train_size = dataset_size * 90 // 100
dataset = tf.data.Dataset.from_tensor_slices(encoded[:train_size])

 

tf.data.Dataset.from_tensor_slices() 메서드는 주어진 데이터셋(리스트, 넘파이 배열 등)을 하나의 텐서단위로 쪼개어주는 메서드이다. 즉, 한 번에 한 글자씩 반환하게 해준다.

example_1 = tf.data.Dataset.from_tensor_slices([4,2,3])

for element in example_1:
	print(element)

 

 

3. 순차 데이터를 윈도 여러 개로 자르기

 

훈련 세트는 백만 개 이상의 글자로 이루어진 시퀀스 하나이다. 이 훈련 세트를 바로 모델에 집어넣는 것은 백만 개의 층이 있는 심층 신경망에 매우 긴 훈련 샘플 하나를 넣는 것과 같이 매우 비효율적이다. 대신 데이터셋의 window() 메서드를 이용하여 이 긴 시퀀스를 작은 많은 텍스트 윈도로 변환한다. (매우 짧은 문자열로 변환)

RNN은 이렇게 짧은 부분의 문자열 길이로 나뉘게 되어도 역전파를 위해 사용될 수 있으므로 변환시키는 것이 더 현명한 방법이다.

 

n_steps = 100
window_length = n_steps + 1
dataset = dataset.window(window_length, shift = 1, drop_remainder = True)

긴 하나의 시계열을 window() 를 사용하여 부분 부분 나누었다는 것은 알겠는데... 직관적으로 어떻게 나뉜것인지 궁금하다. 해당 함수를 조금 살펴보겠다...

 

example_2 = tf.data.Dataset.range(10)
example_2 = example_2.window(5, shift = 1, drop_remainder=True)

for element in example_2:
	print(list(element.as_numpy_iterator()))

window : 그룹화 할 윈도우 크기(갯수)

drop_remainder :  남은 부분을 버릴지 살릴지 여부

shitf : 1 iterator당 몇 개씩 이동할 것인지

 

해당 예에서는 shift가 1이므로 0~4, 1~5, 2~6 등으로 이동하였고 drop_remainder=True로 설정하여 남는 부분은 버렸다. 남는 부분이란게 뭔지 궁금하여 이를False로 변경하여보았다.

 

window에서 5로 지정하였으므로 5개의 원소가 아닌 것들은 버렸다.

 

이번엔 shift의 값을 3으로 변경해보았다. 예상대로라면 0~4, 3~7 식으로 윈도가 생성되어야한다.

정답이다.

 

그러면 위에서 변환한 윈도 형태의 데이터셋을 바로 입력 값으로 사용하면 되는 것인가? 그것도 아니다. 윈도 형태의 데이터를 플랫 데이터셋으로 (Flat, 펼치다) "펼쳐줘야"하는데 그 이유에 대해서 알아보자.

 

위에서 사용한 window() 메서드는 긴 데이터셋을 나눠서 각각 하나의 데이터셋으로 표현되는 윈도를 변환해준다. 이를 중첩 데이터셋이라고 한다. 

예를 들어 window() 메소드를 이용하여 하나의 긴 데이터셋 {1, 2, 3, 4, 5, 6} 을 {{1, 2}, {3, ,4 ,5 ,6}} 의 중첩 데이터셋으로 변환할 수 있다. ({1, 2}, {3, 4, 5, 6}은 하나의 윈도이다.) 하지만 모델은 이러한 데이터셋이 아닌 하나의 텐서를 입력으로 기대하기 때문에 이러한 윈도 형태의 데이터를 Flat해서 텐서 형태로 바꿔줘야한다.

 

(배치 사이즈를 2로 하는 람다 식을 추가하였으므로 위에서 만든 [0, 1, 2, 3, 4] 의 윈도가 [0,1] [2,3] [4] 총 3개의 텐서로 변환된 모습이다.) 

example_2 = example_2.flat_map(lambda w: w.batch(2))
for d in example_2:
	print(d)

 

여기서 한 가지 의문점이 들었다. 차라리 이럴거면 뭐하러 window 데이터셋을 이용하였을까? 그냥 바로 flat_map 을 활용한다면 원하는 데이터만큼 나눌 수 있지 않을까?

만약 0부터 99까지의 데이터를 flat_map을 한다면 [0,1][2,3][4,5] --- 으로 나뉠 것이다. 그런데 위에서 생성된 텐서를 보면 [4] 라는 텐서가 보인다. 이러한 차이점이 있기 때문에 window()를 먼저 해주는 것이다.

그리고 또 한가지!

윈도 형태로 변환시키지 않고 바로 플랫하게되면

의 에러가 뜬다. 

 

 

그러면 본론으로 돌아와서... 예제 데이터셋을 평평하게 만들어보자.

dataset = dataset.flat_map(lambda window : window.batch(window_length))

윈도마다 batch(window_length)를 호출한다. 결국 이 데이터셋은 연속된 101 글자 길이의 윈도를 텐서 형태로 변환하여 가지고 있게된다. 경사 하강법은 훈련 세트 샘플이 동일 독립 분포일때 가장 잘 작동하기 때문에 이 윈도를 섞어야 한다. (동일 독립 분포일때 가장 잘 작동하는 이유는 따로 포스팅하겠다.) 그다음 윈도를 배치로 만들고 입력(처음 100개의 글자)과 타깃(마지막 글자)를 분리하겠다.

batch_size = 32
dataset = dataset.shuffle(10000).batch(batch_size)
dataset = dataset.map(lambda windows : (windows[:, :-1], windows[:, 1:]))

 

앞서 만들었던 example_2 를 활용해서 셔플에 대해 알아보자.

형태의 example_2를 셔플을 활용해 섞어보겠다.

example_2 = example_2.shuffle(buffer_size = 사용자지정).batch(2)

배치 2는 말 그대로 2개의 배치로 총 3개의 스텝으로 나눈 것이다.

buffer_size는 요소를 버퍼로 채운 다음에 버퍼에서 요소를 무작위로 샘플링하여 선택한 요소를 새 요소로 바꾼다. (완벽한 셔플링을 위해서는 데이터 세트의 전체 크기보다 크거나 같은 버퍼의 크기가 필요하다.)

 

예를 들어, 데이터 집합에 10000개의 요소가 있는 경우 buffer_size = 1000 을 설정하게 된다면 버퍼의 처음 1000개 요소 중 임의의 요소만 선택하고, 요소가 선택되면 버퍼의 빈 공간이 다음 요소, (즉, 1001번째)로 대체되어 1000개의 요소로 이루어진 버퍼를 유지한다.

 

 

그다음 확인할 메서드는 map메서드이다. 위의 map 메서드는 101개의 요소가 존재하는 텐서에서 처음 100개의 입력과 101번째의 타깃을 분리하는데 사용되었다. 이처럼 map은 각 row의 마지막 index 전까지는 훈련 데이터로, 마지막 index는 label로 활용하겠다는 의도에서 생겨난 메서드이다.

 

이 역시 예시를 통해 확인해보겠다.

batch_size = 5

ds = tf.data.Dataset.range(10)
ds = ds.window(batch_size, shift = 1, drop_remainder=True)
ds = ds.flat_map(lambda w : w.batch(batch_size))
ds = ds.shuffle(10)

ds = ds.map(lambda x: (x[:-1], x[-1:]))
for x, y in ds:
    print('train set: {}'.format(x))
    print('label set: {}'.format(y))

[3 4 5 6 7] 에서 [3 4 5 6]과 [7] 로 나뉘었다. 이를 통해서 100개의 훈련 데이터로 101번째의 단어가 무엇인지 예측하는 시계열 데이터로 활용할 수 있게 된다.

 

 

다시 본론으로 넘어와서...

일반적으로 범주형 입력 특성은 원-핫 벡터나 임베딩으로 인코딩되어야 한다. 여기에서는 고유 글자가 39개밖에 안되기 때문에 원-핫 벡터를 사용하여 인코딩한다.

dataset = dataset.map(lambda X_batch, Y_batch :
	(tf.one_hot(X_batch, depth=max_id), Y_batch))

 

마지막으로 프리패칭을 추가하여 병렬처리가 가능하도록 한다.

dataset = dataset.prefetch(1)

 

 

Char-RNN 모델을 만들고 훈련하기

 

이번엔 Char-RNN을 만들어보겠다. 이전 글자 100개를 기반으로 다음 101째 글자를 예측하기 위해 유닛 128개를 가진 GRU 층 2개와 입력과 은닉상태에 20% 드롭아웃을 사용하였다. 출력층은 TimeDistributed 클래스를 적용한 Dense 층이다.

텍스트에 있는 고유한 글자 수는 39개이므로 이 층은 39개의 유닛(max_id)을 가져야 한다. 타임 스텝마다 각 글자에 대한 확률의 합은 1이므로 dense 층에 소프트맥스 함수를 사용하였다. 그다음 sparse_categorical_crossentropy 손실과 Adam 옵티마이저를 사용하여 컴파일한다.

model = keras.models.Sequential([
    keras.layers.GRU(128, return_sequences=True, input_shape=[None, max_id], dropout=0.2, recurrent_dropout=0.2),
    keras.layers.GRU(128, return_sequences=True, dropout=0.2, recurrent_dropout=0.2),
    keras.layers.TimeDistributed(keras.layers.Dense(max_id, activation="softmax"))
])

model.compile(loss="sparse_categorical_crossentropy", optimizer = "adam")

+history = model.fit(dataset, epochs=20) 

내 노트북으로는 6시간이 걸리더라...

 

이제 생성된 모델을 활용하여 how are yo 다음에 올 u를 예측해보자.

def preprocess(texts):
    X = np.array(tokenizer.texts_to_sequences(texts))-1
    return tf.one_hot(X, max_id)
    
    
X_new = preprocess(["How are yo"])
Y_pred = model.predict_classes(X_new)
tokenizer.sequences_to_texts(Y_pred+1)[0][-1]

오우... 성공했다.