핸즈온머신러닝&딥러닝

RNN과 어텐션을 사용한 자연어 처리 2

threegopark 2021. 5. 5. 14:56
728x90

Char-RNN 모델은 새로운 텍스트를 생성하기 위해 입력값으로 먼저 초기 텍스트를 주입한다. 이 텍스트를 활용하여 모델은 그 다음에 나올 가장 가능성 있는 글자를 예측한다. 예측된 글자를 기존 텍스트에 첨가하여 다시 모델이 돌아가고 다음 글자를 예측하는 식으로 텍스트를 완성하게 된다. 

 

하지만 실제로는 같은 단어가 계속해서 반복되는 경우가 많이 발생한다. 이를 방지하기 위해 텐서플로의 tf.random.categorical() 함수를 사용할 수 있다. 해당 함수는 모델이 추정한 확률을 기반으로 다음 글자를 무작위로 선택할 수 있도록 해준다. 

 

(categorical 함수 내에는 일련의 수식이 포함되어 모델이 추정한 확률을 조금 더 확실하게 만들어주는데, 가령 predict() 함수로 0~1 사이의 확률 값을 얻었다면 categorical 함수는 이렇게 나온 결과값에 1. 로그를 취하고,    2. 0에 가까운 사용자 지정 temperature(온도)로 나누며,        3.다시 지수 함수로 복원시켜,        4.결국 0에 가까웠던 값은 더 작아지고, 1에 가까웠던 값은 더욱 1에 가까워지게 된다. (온도값이 높을수록 모든 글자가 비슷한 확률을 가지게 된다.)

 

이번에는 위의 내용들처럼 다음 글자를 선택하고 입력 텍스트에 추가하는 함수를 만들어보자. 

def next_char(text, temperature=1):
    X_new = preprocess([text])
    y_proba = model.predict(X_new)[0,-1:,:]
    rescaled_logits = tf.math.log(y_proba) / temperature
    char_id = tf.random.categoriclal(rescaled_logits, num_samples = 1) + 1
    return tokenizer.sequences_to_texts(char_id.numpy())[0]

 

그다음 next_char() 함수를 반복  호출하여 다음 글자를 얻고 텍스트에 추가하는 또 다른 함수를 만든다.

 

def complete_text(text, n_char=50, temperature=1):
    for _ in range(n_chars):
    	text += next_char(text, temperature)
    return text

 

temperature을 변경해가며 테스트해보면

print(complete_text("t", temperature=0.2))
the belly the great and who shall be the belly the

print(complete_text("w", temperature=1))
thing? or why you gremio.
who make which the first

print(complete_text("w", temperature=2))
th no cce.
yeolg-hormer firi. a play asks.
fol rusb

 

조금 더 좋은 텍스트를 생성하기위해서 이전에 만든 모델에서 GRU 층과 층의 뉴런 수를 조금 더 늘리고 더 오래 훈련하거나 규제를 추가해볼 수 있다. 정확도뿐만 아니라 이 모델은 현재 글자 100개 이상의 긴 패턴은 학습할 수 없다. 윈도를 더 크게할 수 있지만 훈련이 더 어려워진다. LSTM과 GRU 셀이라도 이처럼 매우 긴 시퀀스는 다룰 수 없다. 이를 해결하기 위해 상태가 있는 RNN을 사용한다.

 

 

 

상태가 있는 RNN

 

위의 예시로 만든 모델은 상태가 없는 RNN이다. 상태가 없는 RNN각 반복에서 무작위하게 택한 텍스트의 일부분으로 학습하고, 나머지 텍스트에서 어떤 정보도 사용하지 않는다. 즉, 훈련 반복마다 모델의 은닉 상태를 0으로 초기화한다. 타임 스텝마다 이 상태를 업데이트하고 마지막 타임 스텝 후에는 더 필요가 없기 때문이 버린다.

 

반면에 이제부터 공부할 상태가 있는 RNN훈련 반복 사이에 은닉 상태를 유지하고 중지된 곳에서 이어서 상태를 반영하는 RNN이다. 조금 더 쉽게 설명하자면 한 배치 상태를 처리한 후에 마지막 상태를 다음 훈련 배치의 초기 상태로 사용하는 것이다. 이렇게 하면 역전파는 짧은 시퀀스에서 일어나지만 모델이 장기간 패턴을 학습할 수 있게 된다. 그렇다면 이제 상태가 있는 RNN은 어떻게 만드는지 알아보겠다.

 

먼저 상태가 있는 RNN은 배치에 있는 각 입력 시퀀스가 이전 배치의 시퀀스가 끝난 지점에서 시작해야 한다. 따라서 상태가 있는 RNN을 만들기 위해 첫 번째로 순차적이고 겹치지 않는 입력 시퀀스를 만든다. 

앞선 예제에서처럼 데이터셋을 만들때 window() 메서드에서 shift=1 대신에 shift=n_steps를 사용하고, shuffle()을 사용하지 말아야한다. 

 

상태가 없는 RNN의 경우 batch(32)는 내부에 32개의 윈도우가 들어있기 때문에, 첫 번째 배치는 1~32까지의 윈도우, 두 번째 배치는 33~64까지의 윈도우를 포함하게 된다. 각 배치의 첫 번째 윈도우를 살펴보면 1과 33, 즉 연속적이지 않음을 알 수 있다. 상태가 있는 RNN은 각 배치가 연속적이여야 하므로 배치 하나에 윈도우 하나만을 갖도록 만들어 이를 해결할 수 있게 한다.

 

import numpy as np
import tensorflow as tf
from tensorflow import keras

path_to_file = tf.keras.utils.get_file('shakespeare.txt', 'https://storage.googleapis.com/download.tensorflow.org/data/shakespeare.txt')

with open(path_to_file) as f:
	shakespeare_text = f.read()


tokenizer = keras.preprocessing.text.Tokenizer(char_level = True)
tokenizer = fit_on_texts(shakespeare_text)

max_id = len(tokenizer.word_index)
dataset_size = tokenizer.document_count
batch_size = 32

[encoded] = np.array(tokeninzer.texts_to_sequences([shakespeare_text])) - 1

train_size = dataset_size*90//100
n_steps = 100
window_length = n_steps+1

encoded_parts = np.array_split(encoded[:train_size], batch_size)
datasets = []
for encoded_part in encoded_parts:
    dataset = tf.data.Dataset.from_tensor_slices(encoded_part)
    dataset = dataset.window(window_length, shift=n_steps, drop_remainder=True)
    dataset = dataset.flat_map(lambda window: window.batch(window_length))
    datasets.append(dataset)
dataset = tf.data.Dataset.zip(tuple(datasets)).map(lambda *windows: tf.stack(windows))
dataset = dataset.repeat().map(lambda windows: (windows[:, :-1], windows[:, 1:]))
dataset = dataset.map(
    lambda X_batch, Y_batch: (tf.one_hot(X_batch, depth=max_id), Y_batch))
dataset = dataset.prefetch(1)

 

이제 상태가 있는 RNN 모델을 만들어보자. 

모델 생성에 앞서 주의해야 할 점은

첫째, 각 순환 층을 만들 때 stateful=True 로 지정할 것

둘째, 상태가 있는 RNN은 배치 크기를 알아야 한다. (배치에 있는 입력 시퀀스의 상태를 보존해야 하기 때문이다.)

따라서 첫 번째 층에 batch_input_shape 매개변수를 지정해야 한다. 하지만 마찬가지로 입력은 어느 길이도 가질 수 있기 때문에 None으로 지정해줘도 된다.

 

model = keras.models.Sequential([
    keras.layers.GRU(128, return_sequences=True, stateful=True, dropout=0.2, recurrent_dropout=0.2,
    					batch_input_shape=[batch_size, None, max_id]),
    keras.layers.GRU(128, return_sequences=True, stateful=True, dropout=0.2, recurrent_dropout=0.2)
    keras.layers.TimeDistributed(keras.layers.Dense(max_id, activation="softmax"))
])

 

에포크 끝마다 텍스트를 다시 시작하기 전에 상태를 재설정해야한다. 콜백 함수를 사용하여 처리한다.

class ResetStatesCallback(keras.callbacks.Callback):
    def on_epoch_begin(self, epoch, logs):
    	self.model.reset_states()
model.compile(loss="sparse_categorical_crossentropy", optimizer = "adam")
model.fit(dataset, steps_per_epoch=steps_per_epoch, epochs=50, callbacks=[ResetStatesCallback()])