pythonメニュー

この内容は、書籍「TensorFlowの活用ガイド(技術評論社)」を参考に変更してWindows10で実行・実験した資料です。 元のソースや理屈は、この書籍をご参照ください。

未完成 検討中 tensorflow RNNによる深層学習

データの取得、管理

文章の分類(作者推定)

文章の生成

トークン管理用のTextEncoderクラス定義 (utils.pyのファイルで作成)

各種のTokenizerクラス」と、「TextEncoderクラス」を定義しています。
始めに基底クラス(抽象クラス)となるTokenizerを定義し、それから各種派生クラスを定義しています。
Tokenizerクラス」は、文章などの対象をトークンに分解するencodeメソッドと、トークン群を結合して文章を作るdecodeメソッドを定義しています。
これを継承して、スペースをデフォルトのdelimiterに使う「WordTokenizerクラス」、文字をトークンとして使う「CharTokenizerクラス」 、形態素解析用モジュールのMeCabを使ってトークン分解して処理する「MeCabTokenizerクラス」を定義しています。
そして、複数のトークン(語彙: vocab. : vocabulary)を管理する「TextEncoderクラス」クラスを定義している。
このTextEncoderクラスでは、build_vocabメソッドで、引数を文字列から語彙に分解して、vocab 、vocab_reversed に記憶する。 vocabは使用頻度降順のリストで、vocab_reversedはこの添え字をトークンをキーに記憶した辞書になっている。
# -*- coding:utf-8 -*-
# <"charset=UTF-8">

import abc # 抽象クラスを作る場合の基底クラス
import sys
import collections

# 抽象クラス トークン化のencode と、その逆を行うdecodeメソッドを定義
class Tokenizer(metaclass=abc.ABCMeta):
    @abc.abstractmethod
    def encode(self, text):
        raise NotImplementedError()
    
    @abc.abstractmethod
    def decode(self, tokens):
        raise NotImplementedError()

# スペースをデフォルトのdelimiterに使うencode と、decodeにオーバーライドしたTokenizer
class WordTokenizer(Tokenizer):
    def __init__(self, delimiter=' '):
        self.delimiter = delimiter
    
    def encode(self, text):
        return text.split(self.delimiter)
    
    def decode(self, tokens):
        return self.delimiter.join(tokens)

#  文字をトークンとして使うencode と、decodeにオーバーライドしたTokenizer
class CharTokenizer(Tokenizer):
    def encode(self, text):
        return list(text)
    
    def decode(self, tokens):
        return ''.join(tokens)


#  形態素解析で分解したトークンを使うencode と、decodeにオーバーライドしたTokenizer
class MeCabTokenizer(Tokenizer):
    def __init__(self, mecab_tagger=None):
        import MeCab # 形態素解析用モジュール
        #
        self.mecab_tagger = mecab_tagger if mecab_tagger else MeCab.Tagger()
        # Hack. see http://d.hatena.ne.jp/oggata/20161206
        self.mecab_tagger.parse('')
    
    def encode(self, text):
        node = self.mecab_tagger.parseToNode(text)
        while node is not None:
            feature = node.feature.split(',')
            if feature[0] != 'BOS/EOS':
                yield node.surface
            node = node.next
    
    def decode(self, tokens):
        return ''.join(tokens)

# 複数のトークン(語彙: vocab. : vocabulary)を管理するクラス
class TextEncoder(object):
    RESERVED_TOKENS = [ # 予約済トークン
        '<PAD>',# パディング用
        '<BOS>',# 文の始まり、
        '<EOS>',# 文のおわり
        '<UNK>' # 未知の語彙
    ]
    #
    def __init__(self, vocab=None, tokenizer=WordTokenizer()):
        if vocab is not None:
            self.vocab = vocab # vocabは使用頻度降順のトークンのリスト(この添え字をトークンを識別する番号(id_)として使う)
            self.vocab_reversed = self._reverse_vocab(self.vocab) # トークンをキーにして、上記の番号(id_)を記憶するディクショナリ
        else:
            self.vocab = None
            self.vocab_reversed = None
        self.num_reserved_tokens = len(self.RESERVED_TOKENS)
        self.tokenizer = tokenizer
    #
    @property
    def vocab_size(self):
        if self.vocab is not None:
            return len(self.RESERVED_TOKENS) + len(self.vocab)
        else:
            return len(self.RESERVED_TOKENS)
    #
    @staticmethod
    def _reverse_vocab(vocab):
        return {token: id_ for id_, token in enumerate(vocab)}
    
    # 語彙の構築(textsからトークン群を得て、内部に記憶)
    def build_vocab(self, texts, vocab_size=50000):
        counter = collections.Counter()
        for text in texts:
            #counter.update(self.tokenizer.encode(text))
            tmp=self.tokenizer.encode(text) #★トークンに分解
            tmp = [t.strip() for t in tmp] #★
            #print(tmp)#★
            #counter.update(tmp)#★
            counter.update(tmp)#★ 各トークンの使用数を把握 
        
        #print(counter.keys()) #★
        #print(counter.values()) #★
        len_vocab = vocab_size - len(self.RESERVED_TOKENS)
        # print("len_vocab:" + str(len_vocab) , "語彙の総数:" + str(sum(counter.values()))  ) #★
        self.counter = counter
        self.word_count = sum(counter.values()) # 全ての語彙(重複無し)の個数
        self.vocab = [token for token, count in sorted(
            counter.most_common(len_vocab),
            key=lambda x: (-x[1], x[0])
        )] # トークンは使用頻度の降順で並び替えて、vocabリストで管理
        self.vocab_reversed = self._reverse_vocab(self.vocab) # 各語彙vocabリスト
    
    # 引数の文字列から、内部に登録されるトークンを識別する番号(id_)群に変換し、そのリストを返す。
    def encode(self, text):
        encoded = []
        # 個々のトークンを一つずつ処理する繰り返し self.vocabの配列に記憶されるトークンの添え字+elf.num_reserved_tokensの番号が使われる。
        for token in self.tokenizer.encode(text):
            if token in self.vocab_reversed:
                encoded.append(self.vocab_reversed[token] + self.num_reserved_tokens)
            else:
                encoded.append(self.RESERVED_TOKENS.index('<UNK>'))
        return encoded
    
    # 引数のトークンを識別する番号(id_)のリストをトークンに変換し、繋げた文章を返す。
    def decode(self, ids):
        tokens = (self._decode_a_token(id_) for id_ in ids)
        return self.tokenizer.decode(tokens)
    
    def _decode_a_token(self, token_id):
        if token_id < self.num_reserved_tokens:
            token = self.RESERVED_TOKENS[token_id]
            if token != '<UNK>':
                token = ''
        else:
            token = self.vocab[token_id - self.num_reserved_tokens]
        return token
    #
    #----------検証用自作関数-------------------------------------------------
    #
    def print_info(self):
        print("build_vocabメソッドで文字列からトークンを構築します。")
        #print(self.counter)    
        print(self.vocab[0:20:])    
        print("・・・省略・・・・")    
        print(self.vocab[-20::])
        print("上記[self.vocab]は、" , type(self.vocab), "で、この出現数の順で番号を付けた。") 
        print("その[self.vocab_reversed]を以下で検証。(この番号はself.vocabの添え字)")
        for i in range(7):
            print("{}:{}:{}".format(i, self.vocab[i], self.vocab_reversed[self.vocab[i]]))
        print("・・・省略・・・・")
        n=len(self.vocab)-1
        for i in range(n,n-3,-1):
           print("{}:{}:{}".format(i, self.vocab[i], self.vocab_reversed[self.vocab[i]]))
        print("--------------print_info 以上\n\n")

def main(): # 上記のテスト用main
    text_encoder = TextEncoder(tokenizer=MeCabTokenizer())
    text_encoder.build_vocab('''
いつ まで 生き て て いつ 死ぬ か 解ら ない 程 、 不安 な 淋しい こと は ない と 、 お 葉 は 考へ た の で ある 。 
併 し 人間 が この世 に 生れ 出 た 其瞬 間 に 於い て 、 その 一生 が 明らか な 數字 で 表 は さ れ て あつ た なら ば 、
決定 さ れ た 淋し さ に 、 終り の 近づく 不安 さ に 、 一 日 も 力 ある 希望 に 輝い た 日 を 送る こと が 、 むづかしいかもしれない 。 
けれども お 葉 の 弱い 心 は 定め られ ない 限り ない 生 の 淋し さ に 堪へ られ なく な つたの で ある 。 
そして 三 十 三 に 死な う と 思 つた 時 、 それ が 丁度 目ざす 光明 で も ある か の やう に 、 
行方 の ない 心 の うち に ある 希望 を 求め 得 た か の やう に 、 限り ない 力 と ひそか な 喜び に 堪へ られ なかつ た の で ある 。 
''')
    text_encoder.print_info()
    ids =  text_encoder.encode('いつ まで 生き て て いつ 死ぬ か 解ら ない 程 、 不安 な 淋しい こと は ない と 、 お 葉 は 考へ た の で ある ')
    print(ids)
    print(text_encoder.decode(ids))

if __name__ == '__main__':
    main()

上記の実行結果(クラスの検証)を以下に示します。 (なお、形態素解析エンジンMeCabがインストールされている必要があります。)
build_vocabメソッドで文字列からトークンを構築します。
['な', 'い', 'の', 'に', '、', 'た', 'れ', 'か', 'あ', 'し', 'る', 'つ', 'で', 'ら', 'さ', 'て', 'と', '。', 'う', 'が']
・・・省略・・・・
['弱', '得', '思', '數', '方', '於', '時', '求', '決', '目', '瞬', '程', '終', '考', '行', '表', '解', '輝', '近', '送']
上記[self.vocab]は、 <class 'list'> で、この出現数の順で番号を付けた。
その[self.vocab_reversed]を以下で検証。(この番号はself.vocabの添え字)
0:な:0
1:い:1
2:の:2
3:に:3
4:、:4
5:た:5
6:れ:6
・・・省略・・・・
92:送:92
91:近:91
90:輝:90
--------------print_info 以上

インターネットからデータの取得(学習の前処理 preprocess.py )

以下を実行すると、青空文庫「https://github.com/aozorabunko/aozorabunko.git」からデータをダウンロードし、
「'data/raw/aozorabunko' 」のディレクトリに保存します。このデータをAIの学習などに使う予定です。
import os
import sys
import re
import subprocess # サブプロセス実行用

DEFAULT_RAW_DIR = 'data/raw/aozorabunko' # ダウンロードファイルの生データを格納するディレクトリ
AOZORA_GIT_URL = 'https://github.com/aozorabunko/aozorabunko.git' # 青空文庫のURL

import MeCab
tagger = MeCab.Tagger()
tagger.parse('')

# 引数のtextを形態素解析で分解されたトークンのジェネレータ
def tokenize(text):
    node = tagger.parseToNode(text)
    while node is not None:
        feature = node.feature.split(',')
        if feature[0] != 'BOS/EOS': # BOS(beginning of sentence)は文頭、EOS(end of sentence)は文末で、どいらでもない?
            yield node.surface
        node = node.next

import unicodedata
import lxml.html

# 引数 html_file のwebページから本文を抽出し、ルビの除去し、余計な空白を削除し、正規化したファイルを返す。
def parse_html(html_file):
    html = lxml.html.parse(html_file)
    # 本文を抽出
    main_text = html.find('//div[@class="main_text"]')
    if main_text is None:
        return None
    
    # ルビの除去
    for rt in main_text.findall('.//rt'):
        rt.getparent().remove(rt)
    for rp in main_text.findall('.//rp'):
        rp.getparent().remove(rp)
    
    # 注記と前後の不要な空白を除去
    text = re.sub(
        '[#[^]]+]\n?',
        '',
        main_text.text_content(),
        flags=(re.MULTILINE)
    ).strip()
    
    # 正規化
    text = unicodedata.normalize('NFKC', text)
    text = text.lower()
    return text


# 「git clone」のコマンドをサブプロセスで実行し、青空文庫から「DEFAULT_RAW_DIR」のディレクトリにダウンロードする。
def download_data(data_dir):
    if not os.path.exists(data_dir): # # 「data_dir」のフォルダが存在しない?
        parent_dir = os.path.dirname(data_dir)
        if not os.path.exists(parent_dir):
            os.makedirs(parent_dir) # 「data_dir」のフォルダを作成する。
        
        cmd_str = 'git clone {} {}'.format(AOZORA_GIT_URL, data_dir)
        print(cmd_str,  "のサブプロセス実行")
        ret = subprocess.call(
            cmd_str,
            shell=True
        )
        if ret != 0:
            raise RuntimeError('Failed to download dataset.')
        return True
    return False # 「data_dir」のフォルダがすでに存在する場合は、なにもしない。

print("ダウンロードファイルの格納ディレクトリ(存在しなければ生成):" + DEFAULT_RAW_DIR)
if not download_data(DEFAULT_RAW_DIR):
   print(DEFAULT_RAW_DIR,"のフォルダが既に存在したので、ダウンロードしませんでした。")

# index_fileが示す「list_person_all_extended_utf8.zip」の解凍データがcsv形式で、
#  そのcsvデータから有効な「'作品ID', 有効な相対パス」を列挙するジェネレータを返す。
def get_avilable_contents(index_file):
    import zipfile
    import pandas as pd
    z = zipfile.ZipFile(index_file)
    with z.open(z.filelist[0].filename) as i_:
        df = pd.read_csv(i_)
    
    for _, row in df.iterrows():
        content_id = row['作品ID']
        url = row['XHTML/HTMLファイルURL']
        if not isinstance(url, str): # urlがstrのインスタンスでない?
            continue
        if not url.startswith(AOZORA_SITE_URL): # urlがパス先頭が、引数と同じか?
            continue
        path = url[len(AOZORA_SITE_URL):] # ルート相対パスとなる部分文字列取得
        yield content_id, path

INDEX_FILEPATH = 'index_pages/list_person_all_extended_utf8.zip' # 青空文庫githubからダウンロードしたインデックスファイル

idx_file_path = os.path.join( DEFAULT_RAW_DIR, INDEX_FILEPATH )
print("\n青空文庫からダウンロードしたインデックスファイル", idx_file_path);

gen_id_path = get_avilable_contents( idx_file_path ) # 「作品ID と パス」のジェネレータを生成

DEFAULT_OUTPUT_DIR = 'data/parsed/aozorabunko/morph' # 形態素解析して余計の文字を含まない文章に変換した作品群のフォルダ
AOZORA_SITE_URL = 'https://www.aozora.gr.jp/' # 青空文庫のWebサイト
no_morpheme = False

print('青空文庫公式サイトにHTMLファイルが存在する作品について、',DEFAULT_RAW_DIR,'にある各ファイルを、')
print(' 形態素解析して正規化して、作品IDのtxtファイルを作るループです')
cnt = 0
for content_id, rel_path in gen_id_path:
    sys.stdout.write(' [KProcessing {}\'th file...'.format(cnt))
    cnt += 1
    fname = '{}.txt'.format(content_id)
    output_path = os.path.join(DEFAULT_OUTPUT_DIR, fname)
    raw_path = os.path.join(DEFAULT_RAW_DIR, rel_path)
    print(rel_path, " ------->> ", output_path, "を生成")
    parsed = parse_html(raw_path)
    if parsed:
        with open(output_path, 'w', encoding='utf-8') as o_:
            if no_morpheme:
                print(parsed, file=o_)
            else:
                for line in parsed.split('\n'):
                    print(' '.join(tokenize(line)), file=o_)

print('Done.')


実行結果例を示します。(私の環境では、ダウンロードに約30分、正規化したテキストファイル生成に約30分で、約1時間の処理でした。)
ダウンロードファイルの格納ディレクトリ(存在しなければ生成):data/raw/aozorabunko
git clone https://github.com/aozorabunko/aozorabunko.git data/raw/aozorabunko のサブプロセス実行
Cloning into 'data/raw/aozorabunko'...
remote: Enumerating objects: 180, done.
remote: Counting objects: 100% (180/180), done.
remote: Compressing objects: 100% (150/150), done.
remote: Total 366379 (delta 134), reused 64 (delta 30), pack-reused 366199R
Receiving objects: 100% (366379/366379), 11.07 GiB | 10.95 MiB/s, done.
Resolving deltas: 100% (297333/297333), done.
Checking out files: 100% (63626/63626), done.

青空文庫からダウンロードしたインデックスファイル data/raw/aozorabunko\index_pages/list_person_all_extended_utf8.zip
青空文庫公式サイトにHTMLファイルが存在する作品について、 data/raw/aozorabunko にある各ファイルを、
 形態素解析して正規化して、作品IDのtxtファイルを作るループです
 [KProcessing 0'th file...26
cards/001257/files/56078_51422.html  ------->>  data/parsed/aozorabunko/morph\56078.txt を生成
 [KProcessing 1'th file...26
cards/001257/files/56033_50983.html  ------->>  data/parsed/aozorabunko/morph\56033.txt を生成
 [KProcessing 2'th file...26
cards/001257/files/46658_44767.html  ------->>  data/parsed/aozorabunko/morph\46658.txt を生成
 [KProcessing 3'th file...26
cards/001234/files/46340_24939.html  ------->>  data/parsed/aozorabunko/morph\46340.txt を生成
・・・・・記述省略・・・・・・
cards/001395/files/55622_60230.html  ------->>  data/parsed/aozorabunko/morph\55622.txt を生成
 [KProcessing 15755'th file...30
cards/001395/files/49876_45587.html  ------->>  data/parsed/aozorabunko/morph\49876.txt を生成
 [KProcessing 15756'th file...30
cards/001395/files/49913_42907.html  ------->>  data/parsed/aozorabunko/morph\49913.txt を生成
 [KProcessing 15757'th file...30
cards/001395/files/49914_41917.html  ------->>  data/parsed/aozorabunko/morph\49914.txt を生成
 [KProcessing 15758'th file...30
cards/001185/files/45210_24671.html  ------->>  data/parsed/aozorabunko/morph\45210.txt を生成
>>> print('Done.')
上記では、15758個のテキストファイルが得られました。(青空文庫の状況によって変わる数です。)