このページで示した[Raspberry Pi]のサービスのstart-app-select.pyから呼び出されるstart-app-select.pyのコードです。
(Raspberry Pi 3 Model A+]に接続した[UMEHOSHI ITA]基板を乗せるモータ付き台車で使うコードです)
suzuki@raspberrypi:~ $ ls -l /dev ・・省略・・・ crw-rw---- 1 root dialout 166, 0 Dec 14 12:23 ttyACM0 crw-rw---- 1 root dialout 204, 64 Dec 14 12:23 ttyAMA0 ・・省略・・・調べた結果、Type A のUSBのシリアル通信デバイスを指すファイル名はttyACM0と分かった。
#!/usr/bin/python3
# 「/usr/local/apps/umeusb.py」
import serial
import sys
import time
def defalut_usb_receive_func(bin):
'usb受信のイベントで実行するデフォルト処理'
print( bin.decode('utf-8') ,end="") # USBより受信したデータを表示
usb_receive_func = defalut_usb_receive_func # usb受信で実行する処理
# 上記を変更することで、USB受信に対する振る舞いを変更できる。
"usb:このグローバル変数が指し示す接続相手とUSB通信するためのオブジェクト"
"imort時にオープンされて、下記各関数はそれを利用する。"
usb = None
def init_sub():
global usb
try:
usb = serial.Serial(port = '/dev/ttyACM0', baudrate = 115200)
#usb = serial.Serial(port = 'COM8', baudrate = 115200,timeout = 10)
except Exception as e:
print(e)
sys.exit(1)
print( usb ) # USB のシリアルオブジェクト表示
def check_sum(s):
''' b = check_sum("G10800090000067")
'''
ck = 0
n = len(s)
if n == 0: return
for i in range(n-2):
ck += ord(s[i])
# print(ck)
# print(s[i] , end ="")
# print(' ',s[-2:] , end ="")
ck += int(s[-2:] , 16)
ck &= 0x0f
if ck != 0 :
print("check sum error", ck)
return False
return True
def usb_send(s, quantity:int=2, endstr:bytes=b"\n")->bytes:
''' b = usb_send("G10800090000067", endstr=b"\n",quantity=1):
sを送信して、受信したバイナリを返す。タイムアウト時は""を返す
endstrがquantity個受信した時点でリターンする。
quantity=2のデフォルト値は、送信した1行に対するエコー文字列の1行と
応答文字列の1行を意味します。
'''
if check_sum(s) == False: return b""
usb.write(s.encode('utf-8'))
usb.write(b'\r\n')
rtnval=b""
while True:
if quantity <= 0:
return rtnval # 受信したバイナリーを返す。
b=usb.readline()
if b == "": return rtnval
rtnval+=b
n=b.count(endstr)
quantity-=n
def send_cmd( ss, quantity:int=2, endstr:bytes=b"\n", cmdchar:str="SRG"):# sの例[S048000800000FFAF000086]
'''# 複数の'\n'を含む文字列をusbに送信する。'''
a = ss.split('\n')
for s in a:
s=s.strip()
if s == "" or not s[0] in cmdchar : continue # S R G 以外のコマンドを無視
b = usb_send(s, quantity, endstr) # デフォルトで2行のudb応答文字列の受信を含むコマンド出力
usb_receive_func( b ) # USB受信で得られたbで行う処理
def send_cmdfile(filepath, cmdchar="SRG"):
with open(filepath , "r", encoding="utf-8") as f:
ss = f.readlines()
if filepath.endswith(".umh") : ss = ss[1:] # 先頭のボタン仕様行を除去
ss="\n".join(ss)
#print(ss)
send_cmd( ss, quantity=0, endstr=b"\n", cmdchar=cmdchar)
# USB受信スレット例(b'end\r\n'受信まで繰り返す)
def read_loop():
while True:
try:
b=usb.readline() # binary
if(b == b'end\r\n'):
print("recieve 'end'")
break
usb_receive_func( b ) # USB受信の処理
except Exception as e:
#print(e)
break
#
print("end read_loop()")
if __name__ == "__main__": # キー入力した「UME専用Hexコマンド」を送信して[UMEHOSHI ITA]を制御するテスト用
# スレッドに read_loop 関数を渡して実行を始める
init_sub()
import threading
t_id = threading.Thread(target=read_loop)
t_id.start()
s='G10800090000067' # [80009000]番地から16byteのメモリ内容の取得
usb_send( s, quantity=0 )
send_cmdfile( 'app_pwm_esp32.umh' )
#send_cmdfile( 'uBeep.umh' )
#send_cmdfile( 'uStartInit.umh' )
time.sleep(3) # この待機中で、UMEHOSHI ITA からのUSBデータ(16byteメモリー情報)を表示すればOK
usb.close()
print("Ended !")
上記のmainの実行で、'G10800090000067'+'\r\n'を送って、応答のメモリ内容を表示できればOKです。
#!/usr/bin/python3
# 「/usr/local/apps/umetcp.py」
import os
import sys
import socket
import netifaces
receive_folder="./" # 受信データの記憶位置
receive_folder="/usr/local/apps/" # 受信データの記憶位置
def get_wlan0_ip():
# インターフェース 'wlan0' のアドレス情報を取得
# netifaces.AF_INET は IPv4 アドレスを意味します
addresses = netifaces.ifaddresses('wlan0') # Windows非対応
if netifaces.AF_INET in addresses:
# IPv4 アドレスのリストの最初の要素を取得
ip_info = addresses[netifaces.AF_INET][0]
return ip_info['addr']
else:
return None
def defalut_tcp_recieve_file(filepath):
'tcpでファイルを受信した直後の処理'
filesize = os.path.getsize(filepath)
# 受信したファイル名とサイズを表示
print( "recieve file:{},size:{}".format(filepath, filesize))
tcp_receive_file_func = defalut_tcp_recieve_file # tcpファイル受信で実行する処理
def defalut_tcp_recieve_message(msg):
print( msg )
tcp_receive_message_func = defalut_tcp_recieve_message # tcpメッセージ受信で実行する処理
def send_file(sock, filepath):# filepathのファイルをクライアントに送信(大きなフィルにも対応)
try:
filename = os.path.basename(filepath)
file_info = os.stat(filepath)
filesize = file_info[6]
s = "f{} {}".format(filename, filesize)
bin=(s+"\r\n").encode("utf-8")
print(s) # 名前とサイズの送信文字列
sock.sendall(bin) # send filename filesize
CHUNK = 100000 # この数ずつ送る(必要に応じて変更)
remaining_sizes=filesize
with open(filepath, "rb") as f:
while True:
print(f"Remaining sizes:{remaining_sizes}")
size=CHUNK if remaining_sizes >= CHUNK else remaining_sizes
data = f.read(size)
sock.sendall(data)
remaining_sizes -=len(data)
if remaining_sizes == 0: break
#
#
print(f"send_file:{filepath} end." )
except Exception as e:
print("send_file Error:"+ str(e))
def send_message(sock,msg):
bin=('M'+msg+"\r\n").encode("utf-8")#binaryへ変換
sock.sendall(bin)#一括送信
def recieve_file(sock): # 受信ファイル (ファイル名にスペースを含む場合にも対応)
buf=b""
while True:
bin = sock.recv(1)#1byte受信
if len(bin) != 1: break
buf += bin
if buf[-2:] == b"\r\n":# 1行の文字列受信終了?
s = buf[0:-2].decode('utf-8')#バイナリから
iSpace=s.rfind(' ')
filename, filesize = s[0:iSpace], int(s[iSpace+1:])
print(filename, filesize) # 受信ファイルとサイズ確定
CHUNK = 100000
remaining_sizes=filesize
with open( receive_folder + filename, "wb") as f:
bin=b""
while remaining_sizes > 0:
print(f"Remaining sizes:{remaining_sizes}")
size=CHUNK if remaining_sizes >= CHUNK else remaining_sizes
bin = sock.recv(size) # ファイルの受信
f.write(bin)
remaining_sizes -= len(bin)
#
tcp_receive_file_func( receive_folder + filename) # ファイル受信直後の処理
break
def recieve_message(sock):
buf=b""
while True:
bin = sock.recv(1)#1byte受信
if len(bin) != 1: break
buf += bin
#print("----", buf)
if buf[-2:] == b"\r\n":# 1行の文字列受信終了?
s = buf[0:-2].decode('utf-8')#バイナリから
tcp_receive_message_func(s) # 受信文字列の対処
break
def receiveData(sock):
print( "start receiveData" )
while True:
bin = sock.recv(1)#1byte受信
if len(bin) != 1: break
if bin == b"f" : recieve_file(sock)
elif bin == b"M" : recieve_message(sock)
else:
print(bin , end="")
#
print( "receiveData ended." )
if __name__ == "__main__":
server_addr =('192.168.0.110', 59154) # 接続先をコードで指定する
server_addr =(get_wlan0_ip(), 59154) # RaspberryでSTAやATモードに対応時に使う
print("Serverの情報:",server_addr)
serversock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
serversock.bind(server_addr) # IPとポート番号を指定します
print("接続要求を待つ")
serversock.listen()
sock, address = serversock.accept()#サーバの接続受け入れ
try:
receiveData( sock ) # 命令を受信して処理するループ
except Exception as e:
print(e)
sock.close()
serversock.close()
上記サーバーを実行すると、その"/usr/local/apps/"のディレクトリに、受信したファイル群を記憶できます。
#!/usr/bin/python3
# -*- coding: utf-8 -*-
# TCPサーバープログラム(/usr/local/apps/raspiAPumeTcp.py)
# [Raspberry Pi 3 Model A+]と[UMEHOSHI ITA]を乗せたモータ付き台車のサービスから呼び出される
import board
import busio
from adafruit_ssd1306 import SSD1306_I2C
from PIL import Image, ImageDraw, ImageFont
import time
i2c = busio.I2C(board.SCL, board.SDA)# --- I2C初期化 ---
# --- SSD1306ディスプレイ初期化 (128x64の場合) ---
oled = SSD1306_I2C(128, 64, i2c)
# --- 画面をクリア ---
oled.fill(0)
oled.show()
# --- Pillowで描画領域を作成 ---
image = Image.new("1", (oled.width, oled.height))
draw = ImageDraw.Draw(image)
font = ImageFont.load_default()# --- フォント設定 ---
def draw_text(txt: str, row=0, font=font, fill=255):
# --- テキスト描画 (0=黒、255=白)上記設定で、横21文字---
draw.text((0, row*20), txt , font=font, fill=255)
# --- 画面に表示 ---
oled.image(image)
oled.show()
def oled_clear():
oled.fill(0) # --- クリア
oled.show() # ---表示
import os
import umetcp
from umetcp import send_message, receiveData
import socket
import umeusb
import traceback
sock = None # クライアントと通信するソケット
def my_tcp_receive_file_func(filepath):
''' 受信したumehoshiアプリ用「.umh」のファイルより、
「UME専用Hexコマンド」の文字列をusbへ出力する'''
name,ext = os.path.splitext(filepath)
if not ext == ".umh" : return
with open( filepath ) as f: ss=f.readlines()
ss="".join(ss[1:])
print(ss)
umeusb.send_cmd(ss, quantity=0) # TCPで受信した 「.umh」データを[UMEHOSHI ITA]へ送る
umetcp.tcp_receive_file_func = my_tcp_receive_file_func # tcpファイル受信データの処理を置き換え
def my_tcp_recieve_message(msg):
umeusb.send_cmd(msg)
umetcp.tcp_receive_message_func=my_tcp_recieve_message # TCO受信文字列(UME専用Hexコマンド)処理を置き換え
def my_usb_receive_func(bin):
'usb受信のイベントで実行するデフォルト処理'
global sock
#print("-----------",bin)
ss = bin.decode('utf-8')
a=ss.split('\n')
for s in a:
s = s.strip()
if sock != None:
print(s)
send_message(sock, s) # [UMEHOSHI ITA]からの応答メッセージをTCPで返す
else: print( s )
umeusb.usb_receive_func = my_usb_receive_func # USB受信データの処理を置き換える。
umeusb.init_sub()
import threading
t_id = threading.Thread(target=umeusb.read_loop)
t_id.start()
oled.contrast(128) # 0?255
draw_text(f"UMEHOSHI ITA",0)
#ip="192.168.4.1"
ip=umetcp.get_wlan0_ip() # IPアドレス取得
while ip == None:
ip=umetcp.get_wlan0_ip()
if ip : break
time.sleep(0.1)
umeusb.send_cmdfile("/usr/local/apps/uStartInit.umh") # ロボット初期化
server_addr =(umetcp.get_wlan0_ip(), 59154)
hostname=socket.gethostname()
draw_text(f"{hostname}",1)
draw_text(f"{server_addr[0]},{server_addr[1]}",2)
print("Serverの情報:",hostname, server_addr)
serversock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
serversock.bind(server_addr) # IPとポート番号を指定します
print("接続要求を待つ")
serversock.listen()
while True:
print("接続を待って、接続してきたら許可")
sock, address = serversock.accept()#サーバの接続受け入れ
print("接続相手:",address)
try:
receiveData( sock ) # 受信ループ
except Exception as e:
print(f"例外の種類: {type(e)}")
print(f"スタックトレース: {traceback.format_exc()}")
print(e, "sock.close()")
sock.close()