UMEHOSHI ITA TOP PAGE COMPUTER SHIEN LAB
アルゴリズム(仕様)はここを参照)
''' umcmprs.py
UME連続可逆圧縮 (UME COMPRESSの略名:UMCMPRS)
「UMEHOSHI ITA」基板(https://manabu.quu.cc/up/ume/)で、
USB転送能力を補うために作ったものですが、他にも利用できるでしょう。
(自由にご利用していただいて結構です。
可能な限り下記内容の"umcmprs.py"のモジュール内に上記コメントを含めてご利用ください。)
'''
#from umedebug import debug_hex4
#dflags =[False]*20
# dflags[0]=debug_hex4(nn=0, data=0b0001, enable=True)
# dflags[1]=debug_hex4(1, f"{"++++++++"}", dflags[0])
#from IPython.core.debugger import Pdb; Pdb().set_trace()
''' 1〜16bitの指定のデータを詰めてbyteごとにcallbackを実行する。'''
class Bit2Byte:
count_byte=0
def __init__(self, callback ):
self.bit32=0 # bit設定の作業用変数
self.bitcount=0 # 上記への記憶bit数。 8bit毎にcallback処理
self.callback=callback
#
def setdata(self, data, bitsize): # bit列をbyte列に変換 ビット列構築用で使う。
'''bitsizeのビット数で、dataを記憶 1byte分に詰められたタイミングで callbackを呼ぶ'''
bitmask = (1<<bitsize)-1
if data > bitmask : raise Exception(f"{data} does not fit in {bitsize} bit")
self.bit32 |= data << self.bitcount
self.bitcount += bitsize
while self.bitcount >= 8:
self.callback(self.bit32 & 0x0ff)
Bit2Byte.count_byte+=1
self.bit32 >>= 8
self.bitcount-=8
#
def padding(self): # ビット列構築用で使う
''' bit_sizeサイズが、丁度byteに達するように0で埋める'''
if self.bitcount != 0:
self.setdata(0, 8-self.bitcount) # bit列をbyte列に変換
''' byteのデータ群から指定のビット長を取り出す '''
class Byte2Bit:
def __init__(self, bytedata:bytes ): # 圧縮データを引数するコンストラクタ
self.byte_buf = bytearray(bytedata)
self.byte_buf_len=len(bytedata) # 上記のbit記憶数
self.bit32=0 # bit設定の作業用変数(0〜8)
self.bitcount=0 # 上記へのbit記憶数。
#
def getdata(self,bitsize): # ビット列復元用で使う
'''bitsizeのビット数の値を取り出して返す '''
rv = (self.byte_buf_len << 3) + self.bitcount - bitsize
if rv < 0 : return rv # 負の値(足りないビット数)
bitmask = (1<<bitsize)-1
while self.bitcount < bitsize and self.byte_buf_len>0:
byte0 = self.byte_buf.pop(0)
self.byte_buf_len-=1
self.bit32 |= byte0 << self.bitcount
self.bitcount += 8
val = self.bit32 & bitmask
self.bit32 >>= bitsize
self.bitcount -= bitsize
return val
#
def append(self, bytedata:bytes ): # 圧縮データを追加
for i in range( len( bytedata ) ):
self.byte_buf.append( bytedata[i] )
self.byte_buf_len+=1 # 上記のbit記憶数
def to_uint(val, bitsize): # bitsizeビット長の符号ありvalを、符号なしに変換する
return val & (1<<bitsize)-1
def to_int(uVal, bitsize): # bitsizeビット長の符号無しuValを符号ありに変換
sign_bit=1<<(bitsize-1)
return uVal if uVal < sign_bit else uVal-(sign_bit<<1)
class ChbBlock:
debug = False
SBC_NUMB=15 # パス1用 同一ビットサイズの連続個数(この数以上並ぶ場合にビットサイズを縮小)
bit_bsize=3
bit_range=[7,9,11,13,15,16] # 「ビット長のテーブル」
BB_size=5 # 上記「ビット長のテーブル」要素のバイナリ記憶時のビットサイズ
min_s = [ -(2**(v-1)) for v in bit_range ]
max_s = [ 2**(v-1)-1 for v in bit_range ]
#
@classmethod
def initialize(cls):
cls.prev_CHB = None # get_sequence_bit_indexで利用する前のCHB
cls.prev_bit_range_idx=5
cls.prev_data=0
cls.flag_size_confirmed=False # prev_CHBにサイズ変更のCHBが生成設定時にFalseで、その送出が終わったらTrue
cls.data_buffer = None # 先頭データでget_sequence_bit_indexを使う時に記憶
cls.index_buff = 0
cls.i32 = 0 # 次に続くDABやBABの数で、直後がCHBの場合は0になる
cls.bitbyte = None # 圧縮用Bit2Byteの設定
#
def __init__(self, bit_size_idx, buffer_idx, dict_idx = -1):
self.bit_size_idx = bit_size_idx # サイズ変更
self.buffer_idx = buffer_idx
self.dict_idx = dict_idx # 辞書参照の添え字
#
def out_CHB_bitimage(self, index_buff=0):
n_word = 0
if ChbBlock.i32 == 32:
if ChbBlock.bitbyte :ChbBlock.bitbyte.setdata( 0 , 3 )# サイズ変更無しで32個のデータが続く
if ChbBlock.debug: print(f" [CHB] {32}個")
elif self.dict_idx != -1:
n_word = 1 # 参照ワード数
for idxtbl in range(len(DuplicateCount.idxs)):
if DuplicateCount.idxs[idxtbl].idx == self.dict_idx: break
if DuplicateCount.idxs[idxtbl].count2 >= DuplicateCount.idxs[idxtbl].count1: n_word = 2
if ChbBlock.bitbyte :
ChbBlock.bitbyte.setdata( 0b111, 3) # 辞書テーブル
ChbBlock.bitbyte.setdata( idxtbl , 5 ) # 辞書テーブルの添え字
ChbBlock.bitbyte.setdata( ChbBlock.i32 , 5 )
if ChbBlock.debug:
print(f"[{self.buffer_idx:4}][CHB] :{ChbBlock.data_buffer[self.dict_idx]:5}", end="")
if n_word == 2:
print(f" ,{ChbBlock.data_buffer[self.dict_idx+1]}", end="")
print(f" [{self.dict_idx:4}] dict_idx={idxtbl} の辞書参照 直後のデータ{ChbBlock.i32}個 ")
else:
if ChbBlock.bitbyte :
ChbBlock.bitbyte.setdata( self.bit_size_idx+1, 3)
ChbBlock.bitbyte.setdata( ChbBlock.i32 , 5 )
if ChbBlock.debug: print(f" [CHB] bit size={ChbBlock.bit_range[self.bit_size_idx]} {ChbBlock.i32}個")
return n_word # 辞書の場合の参照ワード数、辞書でない場合は0
#
@classmethod
def is_fit(cls,value, bit_range_idx):
''' 符号付valueが、min_s[bit_range_idx]以上で、max_s[bit_range_idx]以下であるならTrue '''
if bit_range_idx==5 and value < 0: return False
if bit_range_idx==5 and value <= cls.max_s[5]-cls.min_s[5]: return True
if value < 0: value = -value-1
if cls.min_s[bit_range_idx] > value: return False
if cls.max_s[bit_range_idx] < value: return False
return True
#
@classmethod
def get_bit_index(cls, value:int):
''' valueを記憶するための最小ビットサイズを指し示す ChbBlock.bit_range 配列の添え字を返します。'''
bit_range_idx=0
while bit_range_idx < len(cls.bit_range)-1:
if cls.is_fit(value, bit_range_idx) : return bit_range_idx
bit_range_idx += 1
return bit_range_idx
#
@classmethod
def flush_i32(cls, n_word ):
''' cls.i32 個のデータの出力 '''
idx = cls.prev_CHB.buffer_idx + n_word
prev_base_val = cls.data_buffer[idx-1]
for n in range(cls.i32):
base_val = cls.data_buffer[idx+n]
bitsize=ChbBlock.bit_range[cls.prev_CHB.bit_size_idx]
if cls.prev_CHB.bit_size_idx == 5:
if cls.is_fit(base_val, cls.prev_CHB.bit_size_idx) == False:
raise Exception(f"not match get_sequence_bit_index(value={base_val},bit_range_idx={cls.prev_CHB.bit_size_idx})")
if cls.bitbyte :cls.bitbyte.setdata( base_val , bitsize ) # ベースビット長で保存
if cls.debug:print(f"[{idx+n :4}][BAB] :{ base_val:5}, i32={n}")
else:
diff = base_val - prev_base_val
if cls.is_fit(diff, cls.prev_CHB.bit_size_idx) == False:
raise Exception(f"not match get_sequence_bit_index(value={base_val},bit_range_idx={diff})")
diff2 = to_uint( diff , bitsize) # 差分
if cls.bitbyte :cls.bitbyte.setdata( diff2 , bitsize ) # ベースビット長で差分保存
if cls.debug:print(f"[{idx+n :4}][DAB] :{ base_val:5},{bitsize}bit,diff={diff}, i32={n}")
prev_base_val = base_val
#
cls.i32=0
#
@classmethod
def get_sequence_bit_index(cls, uint_data, data_buffer=None, bit2byte:Bit2Byte=None):
''' data_buffer!=Noneならget_sequence_bit_index利用シーケンスの先頭要素の処理で、
data_bufferやbit2byteの登録処理になる。
先頭引数uint_dataだけ(data_buffer==None)の呼び出しは、残りの要素の処理になる。
bitByte2=None 指定は、uint_dataをcls.data_bufferに記憶する処理で、set_dupli_count関数で使われる。
のこ場合は、別途で辞書候補用の重複数カウント処理が併用される。
bitByte2!=None 指定はuint_dataをbitByte2に埋め込む処理で、ume_compress_out関数が使われる。
のこ場合は、別途で辞書候補用リストの参照をbitByte2に埋め込む処理が併用される。
戻り値は暫定ビットサイズです。
'''
#if cls.debug:print(f"UINT INPUT:{uint_data}")
if data_buffer != None : # 先頭データの場合
cls.index_buff = 0
cls.i32 = 0
cls.flag_size_confirmed=False # サイズ変更のCHBが生成時にFalseで、その送出が終わったらTrue
cls.prev_bit_range_idx=len(cls.bit_range)-1 # get_bit_lengthメソッド実行で、以前のビット長を記憶
bitsize= ChbBlock.bit_range[ cls.prev_bit_range_idx ]
cls.prev_data = uint_data
cls.data_buffer = data_buffer
cls.bitbyte=bit2byte
if cls.bitbyte == None:
cls.data_buffer[cls.index_buff]=uint_data # パス1の記憶
else:
cls.bitbyte.setdata( to_uint( cls.data_buffer[cls.index_buff] , bitsize) ,bitsize)
if cls.debug:print(f"[{cls.index_buff :4}][BAB] :{ cls.data_buffer[cls.index_buff]:5}")
return cls.prev_bit_range_idx
#
cls.index_buff+=1
if cls.bitbyte == None:
cls.data_buffer[cls.index_buff]=uint_data # パス1の記憶
diff = uint_data - cls.prev_data
bit_size_idx = cls.get_bit_index(diff)
cls.prev_data = uint_data
cls.i32 += 1
flush_flag = False
if cls.i32 == 1 and cls.prev_CHB is None:
cls.prev_CHB = ChbBlock(bit_size_idx, cls.index_buff) # 暫定CHB生成
if cls.prev_bit_range_idx != bit_size_idx: cls.flag_size_confirmed=False
cls.prev_bit_range_idx = bit_size_idx
elif bit_size_idx > cls.prev_bit_range_idx:#ビットサイズを大きくする必要がある?
if cls.i32 < ChbBlock.SBC_NUMB and cls.prev_CHB.dict_idx == -1:
cls.prev_CHB.bit_size_idx = bit_size_idx # 前のCHBのビットサイズ大きく変更
cls.prev_bit_range_idx = bit_size_idx
cls.flag_size_confirmed = False
if cls.i32 >= 31: flush_flag = True
else: # cls.prev_CHBが辞書または、cls.i32 >= ChbBlock.SBC_NUMB の場合
cls.i32 -= 1
n_word=cls.prev_CHB.out_CHB_bitimage() # 前の辞書を出力
cls.flush_i32(n_word)
cls.i32 += 1
cls.prev_CHB = ChbBlock(bit_size_idx, cls.index_buff) # 暫定CHB生成
if cls.prev_bit_range_idx != bit_size_idx: cls.flag_size_confirmed=False
cls.prev_bit_range_idx = bit_size_idx
elif (not cls.flag_size_confirmed or cls.prev_CHB.dict_idx != -1)and cls.i32 == 31:
flush_flag = True
elif cls.flag_size_confirmed and cls.i32 == 32:
flush_flag = True
elif bit_size_idx < cls.prev_bit_range_idx and cls.i32 >= ChbBlock.SBC_NUMB:
cls.flag_size_confirmed = False
flush_flag = True # ビットサイズを小さくする出力指示。
elif bit_size_idx < cls.prev_bit_range_idx and \
cls.prev_bit_range_idx == 5 and cls.i32 >= 4:
cls.flag_size_confirmed = False
flush_flag = True # ベースビット長時のビットサイズを小さくする出力指示(4個で判定)。
#
if flush_flag:
n_word=cls.prev_CHB.out_CHB_bitimage()
cls.flush_i32(n_word)
if cls.prev_CHB.i32 != 32 and cls.prev_CHB.dict_idx == -1: cls.flag_size_confirmed=True
ChbBlock.prev_CHB = None
#
return cls.prev_bit_range_idx
#
@classmethod
def get_sequence_bit_index_flush(cls):
if cls.prev_CHB :
n_word=cls.prev_CHB.out_CHB_bitimage()
cls.flush_i32(n_word)
ChbBlock.prev_CHB = None
if cls.bitbyte :cls.bitbyte.setdata( 6 , 3 )# 終了 CHB 送出
if cls.bitbyte :cls.bitbyte.setdata( 0 , 5 )
if ChbBlock.debug: print(f" [CHB] bit size={ChbBlock.bit_range[5]} {0}個")
if cls.bitbyte :cls.bitbyte.padding() # byteに足りないbit列を0で埋める
return 5
#
@classmethod
def get_header_bitbyte(cls, idxs, datas, callback)->Bit2Byte:
''' idxsは、DuplicateCountの辞書参照用リスト '''
if ChbBlock.debug: print("------------------------------ create header")
bib=Bit2Byte( callback ) #ヘッド部圧縮用バイナリ用
bib.setdata( 0 , 1) #1ビットで0を記憶([Ver])
bib.setdata( ChbBlock.BB_size , 5) # ビット長記憶域のビットサイズを5ビットで記憶
for i in range(5, -1, -1):
bib.setdata(cls.bit_range[i]-1, ChbBlock.BB_size) # ChbBlock.BB_size ビットで記憶
if ChbBlock.debug: print(cls.bit_range[i] , end=" ")
if ChbBlock.debug: print( " :Used bit length")
# 先頭の辞書データの記憶
bib.setdata( len( idxs ), 5) # 辞書データの個数 (0〜31)
if ChbBlock.debug: print(f"DIB(辞書ブロック)の個数:{ len( idxs ) }")
for i in range( len( idxs ) ):
n_bit = 1 if idxs[i].count1 <= idxs[i].count2 else 0
idx_buf=idxs[i].idx
if idx_buf+1 == len(datas) : n_bit = 0
if n_bit == 0: bib.setdata( 0 , 1) # 0記憶( 辞書ワードサイズが1 )
else:
bib.setdata( 1 , 1) # 10を記憶( 辞書ワードサイズが2 )
bib.setdata( 0 , 1) # 10を記憶( 辞書ワードサイズが2 )
if ChbBlock.debug: print(f"DIB[{i}]:使用数:{max(idxs[i].count1,idxs[i].count2)}個",end="")
if ChbBlock.debug: print(f" 値:{datas[idx_buf]}",end="")
bib.setdata( datas[idx_buf] , cls.bit_range[5]) # 辞書データの記憶
if n_bit == 1 :
if ChbBlock.debug: print(f", {datas[idx_buf+1]} ",end="")
bib.setdata( datas[idx_buf+1] , cls.bit_range[5]) # 辞書データの記憶
if ChbBlock.debug: print()
if ChbBlock.debug: print(f"-----------DIB(辞書ブロック)の終了 header size: {Bit2Byte.count_byte}byte")
return bib
#
@classmethod
def load_haeder(cls, bin_haed:Byte2Bit):
''' ChbBlock.BB_sizeと、ChbBlock.bit_range[0]から[5]の読み込みと
DuplicateCount.dict_table の生成と読み込み。
'''
if ChbBlock.debug: print("------------------------------ヘッダー部 復元")
bib=bin_haed #bib = BitByte(bin_haed)
if bib.getdata(1) != 0: raise Exception("Version does not match") # 未決定仕様
ChbBlock.BB_size = bib.getdata(5) #5bit取り出し
if ChbBlock.debug: print(f"「ビット長のテーブル」要素のサイズ:{ChbBlock.BB_size} bit")
for i in range( len(cls.bit_range) ):
b=bib.getdata( ChbBlock.BB_size ) # BB bit取り出し
cls.bit_range[5-i]=b+1
if ChbBlock.debug: print(b+1, end=" ")
dict_len=bib.getdata(5) # 0〜31 # 使用辞書の個数取得
DuplicateCount.dict_table=[None]*dict_len
if ChbBlock.debug: print(f":使用するビット長\nDIB(辞書ブロック)の個数:{dict_len}")
for i in range( dict_len ):
num_bit = bib.getdata(1)
if num_bit == 1:
num_bit2 = bib.getdata(1) # 0のはず
if num_bit2 != 0: raise Exception("Only 1 to 2 dictionary sizes are supported.")
word1 = bib.getdata(cls.bit_range[5])
word2 = None
if num_bit == 1:
word2 = bib.getdata(cls.bit_range[5])
DuplicateCount.dict_table[i] = DictElement2(num_bit, word1, word2)
if word2 == None: word2 =""
if ChbBlock.debug: print(f"DICT[{i}]: {word1} {word2}")
#
class DictElement2: # 2つ要素までを持つDuplicateCount.dict_tableの辞書要素
def __init__(self, num_bit, word1, word2=None):
self.num_bit=num_bit # 0, 1 の値で、1の時は、word1とword2の2つを利用
self.word1=word1 # word1
self.word2=word2
class DuplicateCount: # 重複データを数えて記憶
MIN_DIFF_BIT=13 # 辞書を適用するビットサイズ(これ以上のビットサイズに適用)
COUNT_FOR_EXTRACT = 10 # 辞書候補の重複カウント数(これ以上のビットサイズに適用)
idxs = [] # extractメソッドで設定されるDuplicateCountの辞書要素のリスト(31個以内)を記憶するバッファ
count_BUFFER1=0 # set_dupli_countメソッドで使われる作業用(データ格納用添え字)
#
def __init__(self, idx):
self.count1=0 # 1ワード重複数 (uint8)
self.count2=0 # 2ワード重複数 (uint8)
self.idx = idx # データ群の中で、最初に見つかる位置
pass
#
def __str__(self):
return f"buffer[{self.idx:4}]:count1={self.count1:3},count2={self.count2:3}"
#
@classmethod
def set_dupli_count(cls, data_buffer:list, list_dupli_count:list, uint_data:int):
''' list_dupli_countで重複データを数える '''
if cls.count_BUFFER1 == 0: # 先頭データの場合
ChbBlock.get_sequence_bit_index(uint_data, data_buffer) #, list_dupli_count)
else:
ChbBlock.get_sequence_bit_index(uint_data)
list_dupli_count[cls.count_BUFFER1].count1=1
i=0
flag_matched=False
if cls.count_BUFFER1 > 0: prev_data = data_buffer[cls.count_BUFFER1-1] # 一つ前の値
while i < cls.count_BUFFER1:
if data_buffer[i]==uint_data: # 一致
if flag_matched == False: list_dupli_count[i].count1+=1
flag_matched=True
if cls.count_BUFFER1 > 0:
if data_buffer[i-1]==prev_data:
list_dupli_count[i-1].count2+=2
break
i+=1
#
cls.count_BUFFER1+=1
@classmethod
def extract(cls, list_dupli_count):
''' 辞書候補のリストを抽出 ist_dupli_countの添え字リストを cls.idxs に設定
ist_dupli_countリストを降順に32個並べて、それを 辞書候補のリストとする。'''
idxs=[None]*32 # 抽出要素分のリスト
icount=0
big=len(list_dupli_count)*2 # 並び替え時に、作業中の最大値として使う
while icount < 32:
''' big よりより小さい要素で、最大の要素と、その添え字を求める '''
maxcount=0 # 範囲内の最大重複数を求めるためのループ内作業用
imax=-1
for i, obj in enumerate(list_dupli_count):
if max(obj.count1, obj.count2) >= big: continue
flag1= imax==-1 and max(obj.count1, obj.count2) > 1
gv=max( list_dupli_count[i].count1, list_dupli_count[i].count2)
flag2=False
if imax!=-1 and maxcount < gv: flag2=True
if flag1 or flag2:
imax=i
maxcount=gv
continue
big=maxcount # 上記ループで得られた最大重複数
if big < cls.COUNT_FOR_EXTRACT: break # 重複数が登録候補に満たない
if imax==-1: break # 候補が見つからない
for i, obj in enumerate(list_dupli_count):
# maxcountと同じと判断する要素添え字をldxsに記憶
if maxcount == max( list_dupli_count[i].count1, list_dupli_count[i].count2):
if icount > 0 :
if idxs[icount-1].count2 >= idxs[icount-1].count1:
if idxs[icount-1].idx+1 == list_dupli_count[i].idx: continue
idxs[icount]=list_dupli_count[i]
icount+=1
if icount == 32: break # Full
#
i=0
while i<31 and idxs[i]!=None: i+=1
cls.idxs = idxs[:i] # idxs[X].idxが昇順のDuplicateCountリストになっている。
def ume_compress_out( data_buffer:list, list_dupli_count:list, bib:Bit2Byte):
''' list_dupli_countの辞書リストも使って、bib に圧縮する '''
ChbBlock.initialize()
i=0
data_buffer_len = len(data_buffer)
while i < data_buffer_len:
uint_data = data_buffer[i]
uint_data_next = None if i+1 >= data_buffer_len else data_buffer[i+1]
if i == 0: # 先頭データの場合
ChbBlock.get_sequence_bit_index(uint_data, data_buffer, bit2byte=bib)
else:
diff = uint_data - ChbBlock.prev_data
bit_size_idx = ChbBlock.get_bit_index(diff)
if list_dupli_count and ChbBlock.flag_size_confirmed : # 辞書参照可能なら登録
idx_ref = -1 # 辞書参照の添え字(0以上の値で有効)
n_word = 1 # 参照ワード数
for ref in list_dupli_count:
if uint_data == data_buffer[ref.idx]: # 辞書参照?
if ref.count2 >= ref.count1: # 2ワード参照
if uint_data_next != None and uint_data_next == data_buffer[ref.idx+1]:
n_word = 2 # 2 word の辞書参照
if ChbBlock.bit_range[bit_size_idx]*2 < DuplicateCount.MIN_DIFF_BIT:continue
idx_ref = ref.idx
break
else: # 1ワード参照
if ChbBlock.bit_range[bit_size_idx] < DuplicateCount.MIN_DIFF_BIT: continue
idx_ref = ref.idx
break
if idx_ref > 0: # 辞書参照の添え字
if ChbBlock.prev_CHB:
nn_word = ChbBlock.prev_CHB.out_CHB_bitimage() # 辞書データ登録前に、前のCHBを送出
ChbBlock.prev_CHB.flush_i32(nn_word) # 記憶したデータを送出
# 辞書データ登録
ChbBlock.prev_CHB = ChbBlock( ChbBlock.prev_bit_range_idx, i , idx_ref)
i+=1
ChbBlock.index_buff+=1
ChbBlock.prev_data = data_buffer[ref.idx]
if n_word == 2:
i+=1
ChbBlock.index_buff+=1
ChbBlock.prev_data = data_buffer[ref.idx+1]
continue
ChbBlock.get_sequence_bit_index(uint_data)
i+=1
#
ChbBlock.get_sequence_bit_index_flush() # フィニッシュ
def ume_compress( datas : list, callback ):
ChbBlock.initialize()
numb=len(datas)
data_buffer=[0]*numb # 1次処理のデータバッファ(パス1でデータをここに記憶、パス2はこれを読んで圧縮)
list_dupli_count=[DuplicateCount(n) for n in range(numb)] # 重複データここに記録
DuplicateCount.count_BUFFER1=0
for x in datas:
DuplicateCount.set_dupli_count(data_buffer, list_dupli_count, x) # xを記憶する繰り返し
ChbBlock.get_sequence_bit_index_flush() # フィニッシュ
#if ChbBlock.debug: print("重複状況:", [str(x) for x in list_dupli_count] )
DuplicateCount.extract(list_dupli_count) # 辞書候補の添え字群をDuplicateCount.idxsにセット
if ChbBlock.debug:
for n , obj in enumerate(DuplicateCount.idxs):
if obj == None: continue
print(f"{n:3} 辞書候補:", obj)
bib=ChbBlock.get_header_bitbyte( DuplicateCount.idxs, datas, callback )
ume_compress_out( datas, DuplicateCount.idxs, bib )
def sub_decompress(bib):
''' bib のbitByte型を解凍して、データリストを返す bibは、ヘッダーを読み取った直後の状態 '''
data2 = [] # 解凍記憶データ
idx2=0 # 上記用添え字
chg_bit_size=ChbBlock.bit_range[5]
base_value=bib.getdata(chg_bit_size) # 現在の設定値
if ChbBlock.debug: print(f"[{idx2:4}][BAB] :{base_value:5},{chg_bit_size:2}bit")
idx2+=1
data2.append( base_value )
while True:
n32=0
chb = bib.getdata(3) # CHB 読み取り
if chb < 0 : raise Exception("CHB not match") #
if chb == 0: # 32個連続
n32=32
if ChbBlock.debug: print(f"CHB {n32}個")
elif chb == 0b111: # 辞書テーブル参照
dict_idx = bib.getdata(5) # 辞書添え字
n32=bib.getdata(5) # 同じサイズが連続する個数
dict_element2 = DuplicateCount.dict_table[dict_idx]
data2.append( dict_element2.word1 )
base_value = dict_element2.word1
if ChbBlock.debug: print(f" [CHB][{idx2:4}]:{dict_element2.word1:5}", end="")
idx2+=1
if dict_element2.num_bit == 1:
data2.append( dict_element2.word2 )
base_value = dict_element2.word2
if ChbBlock.debug: print(f" ,{dict_element2.word2}", end="")
idx2+=1
if ChbBlock.debug: print(f" dict_idx={dict_idx} {n32}個")
if n32 == 0: continue
else:
chg_bit_size=ChbBlock.bit_range[chb-1]
n32=bib.getdata(5)
if ChbBlock.debug: print(f" [CHB] bit size={chg_bit_size} {n32}個")
#
if n32 == 0 : break # 終了
for n in range(n32):
if chg_bit_size==ChbBlock.bit_range[5]: # [BAB]の復元
base_value = bib.getdata(chg_bit_size)
if ChbBlock.debug: print(f"[{idx2:4}][BAB] :{base_value:5},{chg_bit_size:2}bit")
else: # [DAB]の復元
diff= bib.getdata(chg_bit_size)
diff2 = to_int(diff, chg_bit_size)
base_value += diff2
if ChbBlock.debug: print(f"[{idx2:4}][DAB] :{base_value:5},{chg_bit_size:2}bit diff={diff2:2}")
#
idx2+=1
data2.append( base_value )
#
return data2
def ume_decompress( buff : list):
if ChbBlock.debug: print("\n\n---------- 解凍 --------------")
bib=Byte2Bit(buff)
ChbBlock.load_haeder(bib) # ヘッダ部の復元
if ChbBlock.debug: print("------------------------------ヘッダ部の復元の終了")
buff2=sub_decompress(bib)
return buff2
import os
import sys
import pathlib
if __name__ == '__main__':
#ChbBlock.debug = True
ChbBlock.SBC_NUMB=9 #1 # パス1用 同一ビットサイズの連続個数(この数以上並ぶ場合にビットサイズを縮小)
#DuplicateCount.MIN_DIFF_BIT=8 # 辞書を適用するビットサイズ(デフォルト:13)
#DuplicateCount.COUNT_FOR_EXTRACT=8 # 辞書候補の重複カウント数
datas=[40000, 34000, 29000, 30000, 36000, 35000, 30000, 31000 ]
# datas+=[40000, 34000, 29000, 30000, 36000, 35000, 34001, 33000, 31000 ,36000, 35000, 33000]
# datas+=[5,0,1,2,3,4,5,6,7,8,9,0,1,2,3,4,5,6,7,8,9,0,1,2,3,4,5,6,7,8,9,0,1,2,3,4,5,6,7,8,9,1,2]
# datas+=[3]
# datas+=[4,5,6,7,8,9,0,1,2,3,4,5,6,7,8,9,1,0,1,2,3,4,5,6,7,8,9,0,2,3,4,5,6,7,8,9,]
# datas+=[40000, 34000, 29000, 30000, 36000, 35000, 34001, 33000, 31000 ,36000, 35001]
# datas+=[40000, 34000, 29000, 30000, 36000, 35000, 34001, 33000, 31000 ,36000, 35000]
# datas+=[40000, 34000, 29000, 30000, 36000, 35000, 34001, 33000, 31000 ,36000,35001, 2]
# datas=datas[::-1]
import matplotlib.pyplot as plt
plt.plot(datas)
plt.show() # データ確認用
print(datas, f"個数:{len(datas)}") # 圧縮対象リスト表示
bin_data=bytearray() # 圧縮データを記憶するバイト列
def callback( bt ): # 圧縮処理で呼び出されるコールバック関数
# print( f"[{bt:08b}]", end=" ")
bin_data.append(bt)
ume_compress(datas, callback) # 圧縮! callbackでbin_dataのbytearrayに追加
data2=ume_decompress( bin_data ) # bin_dataのbytearrayを復号して、data2に記憶
print(f"元サイス:{len(datas)*2}byte, 圧縮サイス:{len(bin_data)}byte")
if datas == data2: print("元データと圧縮データが、完全に一致")
else:
for i in range(len(datas)):
if datas[i] == data2[i]: print(f"一致[{i}]:{datas[i]} ")
else : print(f"不一致[{i}]:{datas[i]}!={data2[i]} ")
print(f"ChbBlock.SBC_NUMB={ChbBlock.SBC_NUMB}")
print(f"DuplicateCount.MIN_DIFF_BIT={DuplicateCount.MIN_DIFF_BIT}")
print(f"DuplicateCount.COUNT_FOR_EXTRACT={DuplicateCount.COUNT_FOR_EXTRACT}")
print(f"辞書={len(DuplicateCount.idxs)}")
print("\n以上がtest1()の実行結果です。")
''' umcmprs.py の検証用
'''
from umcmprs import Bit2Byte # bitに圧縮してbyteにするクラス
from umcmprs import Byte2Bit # byteから解凍してbitにするクラス
out_func=print # 圧縮でバイトに詰められた時に実行する関数にprintを登録
barray=bytearray() # 圧縮したバイト列記憶用
def callback( data ): # 圧縮でbyteに詰められた時に実行するコールバック関数
out_func( f"圧縮した{data:08b}を記憶" , end=" " )
barray.append( data )
b_byte=Bit2Byte( callback ) # 圧縮オブジェクト
BIT_SIZE=6 # 記憶で使うビットサイズ
b_byte.setdata(8, BIT_SIZE) # 8を圧縮
b_byte.setdata(9, BIT_SIZE) # 9を圧縮
b_byte.setdata(10, BIT_SIZE) # 10を圧縮
b_byte.setdata(11, BIT_SIZE) # 11を圧縮
b_byte.setdata(7, BIT_SIZE) # 7を圧縮
b_byte.padding() # byte列になるように埋める
print(f"\n圧縮したバイト列:{barray}")
b_bit=Byte2Bit( barray )
for n in range(5):
data=b_bit.getdata( BIT_SIZE )
print( f" 復号データ:{data}" , end=" ")
実行例
圧縮した01001000を記憶 圧縮した10100010を記憶 圧縮した00101100を記憶 圧縮した00000111を記憶 圧縮したバイト列:bytearray(b'H\xa2,\x07') 復号データ:8 復号データ:9 復号データ:10 復号データ:11 復号データ:7
from umcmprs import ChbBlock,DuplicateCount
ChbBlock.debug = True
#ChbBlock.SBC_NUMB=15 #1 # パス1用 同一ビットサイズの連続個数(この数以上並ぶ場合にビットサイズを縮小)
#DuplicateCount.MIN_DIFF_BIT=8 # 辞書を適用するビットサイズ(デフォルト:13)
#DuplicateCount.COUNT_FOR_EXTRACT=11 # 辞書候補の重複カウント数
from umcmprs import ume_compress,ume_decompress
# テストデータ生成
NUMB=1024
NUMB=2048
lst=[int(y) for y in range(NUMB)]
datas=[]
with open("test.bin", "wb") as fw:
while True:
y=0
for x in lst:
y+=x
if y > 65535 : break # uint16 以内
datas.append(y)
fw.write(y.to_bytes(2, 'little')) # 2バイトでリトルエンディアン
if len(datas) >= NUMB: break
if len(datas) >= NUMB: break
# -------圧縮-------
fw=open("test.uprs", "wb")
bin_data=bytearray() # 圧縮データを記憶するバイト列
def callback( bt ): # 圧縮処理で呼び出されるコールバック関数
# print( f"[{bt:08b}]", end=" ")
bin_data.append(bt)
byte_data=bt.to_bytes(1, 'little')
#print(bt, type(bt), byte_data, type(byte_data))
fw.write(byte_data)
ume_compress(datas, callback) # 圧縮! callbackでbin_dataのbytearrayに追加
fw.close()
# -------"test.uprs"のファイルを復号(解凍)-------
with open("test.uprs", "rb") as fr:
bin_data=fr.read()
data2=ume_decompress( bin_data ) # bin_dataのbytearrayを復号して、data2に記憶
#exit()
print(f"元サイス:{len(datas)*2}byte, 圧縮サイス:{len(bin_data)}byte")
if datas == data2: print("元データと圧縮データが、完全に一致")
else:
for i in range(len(datas)):
if datas[i] == data2[i]: print(f"一致[{i}]:{datas[i]} ")
else : print(f"不一致[{i}]:{datas[i]}!={data2[i]} ")
print(f"ChbBlock.SBC_NUMB={ChbBlock.SBC_NUMB}")
print(f"DuplicateCount.MIN_DIFF_BIT={DuplicateCount.MIN_DIFF_BIT}")
print(f"DuplicateCount.COUNT_FOR_EXTRACT={DuplicateCount.COUNT_FOR_EXTRACT}")
print(f"辞書={len(DuplicateCount.idxs)}")
import matplotlib.pyplot as plt
plt.plot(datas) # 圧縮前のデータ
plt.plot(data2) # 圧縮データを復元したデータ(上記との違いを比較)
plt.show() # データ確認用
実行例
[ 0][BAB] : 0 [CHB] bit size=7 31個 [ 1][DAB] : 1,7bit,diff=1, i32=0 [ 2][DAB] : 3,7bit,diff=2, i32=1 ・・・・・・ [2047][DAB] :28203, 9bit diff=237 [CHB] bit size=16 0個 元サイス:4096byte, 圧縮サイス:2561byte 元データと圧縮データが、完全に一致 ChbBlock.SBC_NUMB=15 DuplicateCount.MIN_DIFF_BIT=13 DuplicateCount.COUNT_FOR_EXTRACT=10 辞書=31

from umcmprs import ChbBlock,DuplicateCount
#ChbBlock.debug = True
#ChbBlock.SBC_NUMB=15 #1 # パス1用 同一ビットサイズの連続個数(この数以上並ぶ場合にビットサイズを縮小)
#DuplicateCount.MIN_DIFF_BIT=8 # 辞書を適用するビットサイズ(デフォルト:13)
#DuplicateCount.COUNT_FOR_EXTRACT=11 # 辞書候補の重複カウント数
from umcmprs import ume_compress,ume_decompress
import math
import numpy as np
import random
BUF_SIZE=1024*3
# frequency Hzで16Bitのサンプリングでリトルエンディアン符号無しの1000Hz正弦波データを1024ワード並ぶファイルを、
# 16Bit_1KHz_1024_sin.binの名前で作る。そして 振幅は、0〜1024で変化する生成にする。
# このファイルのバイト数は、BUF_SIZE ワードなので、(BUF_SIZE*2)byteになる。
# 1024ワードで16K時の全体の時間は 『(1/16KHz)*1024=(1/16e3)*1024=0.064秒』と計算できる。
def create_sin(inc , noise, frequency=1000 , sample_rate = 16000):
if not inc:
singen=lambda sec : math.sin( math.pi * 2 * frequency * sec )*(2**15-1) + (2**15-1)
else:
singen=lambda sec : math.sin( math.pi * 2 * frequency * sec )*(2**15-1)*(sec/0.65) + (2**15-1) # ★ 徐々に大きくする
ts = [ 1/sample_rate * t for t in range( BUF_SIZE )]
ys = [ int(singen( t )) for t in ts ]
if noise != 0:
ys = [ int(singen( t ))+int(np.random.uniform(-noise,noise)) for t in ts ] # ★ ノイズを加える
ys = [ t if t <= 65535 else 65535 for t in ys]
ys = [ t if t >= 0 else 0 for t in ys]
bs=np.array(ys, np.uint16)
buffer=b''.join(bs) # バイナリに変換
filename=f"16bit{frequency}Hz_{BUF_SIZE}sin_X.bin"
print(f"生成ファイル:{filename}")
with open(filename,"wb") as fw:
fw.write( buffer )
return ys # np.frombuffer(bs, dtype="int16")
NN=10000 # この数だけ検証データを生成してチェックする繰り返しを行う。
NN=1 # 1回だけの生成とチェック
seed=13 # 乱数シード
for nn in range(NN):
seed=int(random.random() * 1000)
random.seed(seed)
np.random.seed(seed)
reverse=True if random.random() > 0.5 else False
inc = "inc" if random.random() > 0.5 else ""
frequency = int(random.random() * 1000)
noise= int(random.random() * 100)
#
#特定のパラメタ検証用(上記乱数のパラメタ設定を、指定の値で上書する場合、下記のコメントを外す)
seed,reverse,inc,frequency,noise = 886,False,"inc",300,10
#
# 検証用データを生成してdatasに記憶する
datas=create_sin(inc,noise,frequency,sample_rate=16000)
if reverse :
datas=datas[::-1]
#print(datas,type(datas))
print(f"seed={seed}, reverse={reverse} {inc} frequency={frequency},noise={noise}, nn={nn}----")
#
# -------圧縮-------
fw=open("test.uprs", "wb")
bin_data=bytearray() # 圧縮データを記憶するバイト列
def callback( bt ): # 圧縮処理で呼び出されるコールバック関数
# print( f"[{bt:08b}]", end=" ")
bin_data.append(bt)
byte_data=bt.to_bytes(1, 'little')
#print(bt, type(bt), byte_data, type(byte_data))
fw.write(byte_data)
ume_compress(datas, callback) # 圧縮! callbackでbin_dataのbytearrayに追加
fw.close()
# -------復号(解凍)-------
data2=ume_decompress( bin_data ) # bin_dataのbytearrayを復号して、data2に記憶
#exit()
print(f"元サイス:{len(datas)*2}byte, 圧縮サイス:{len(bin_data)}byte")
if datas == data2: print("元データと圧縮データが、完全に一致")
else:
for i in range(len(datas)):
if datas[i] == data2[i]: print(f"一致[{i}]:{datas[i]} ")
else : print(f"不一致[{i}]:{datas[i]}!={data2[i]} ")
break
print(f"ChbBlock.SBC_NUMB={ChbBlock.SBC_NUMB}")
print(f"DuplicateCount.MIN_DIFF_BIT={DuplicateCount.MIN_DIFF_BIT}")
print(f"DuplicateCount.COUNT_FOR_EXTRACT={DuplicateCount.COUNT_FOR_EXTRACT}")
print(f"辞書={len(DuplicateCount.idxs)}")
if NN <= 10 :
import matplotlib.pyplot as plt
plt.plot(datas)
plt.plot(data2)
plt.show() # データ確認用
実行例
生成ファイル:16bit300Hz_3072sin_X.bin seed=886, reverse=False inc frequency=300,noise=10, nn=0---- 元サイス:6144byte, 圧縮サイス:4182byte 元データと圧縮データが、完全に一致 ChbBlock.SBC_NUMB=15 DuplicateCount.MIN_DIFF_BIT=13 DuplicateCount.COUNT_FOR_EXTRACT=10 辞書=0
