TOP PAGE

Raspberry Pi Pico WのMicroPythonで、遠隔制御用のTCPサーバーを構築する

右のように、Raspberry Pi Pico Wへ電源さえ供給すれば、Wifiを介してPCからPico WをMicroPythonで遠隔操作するサーバー構築を紹介します。

目標はサーバーを介した次の遠隔操作です。
Raspberry Pi Pico Wへの任意ファイルのアップロード、ダウンロード、 ルートディレクトリのリスト情報出力、任意テキストファイル内容の情報出力、ファイルの削除、pythonモジュールの実行、サーバーの終了指定。
最終的に、Thonny(トニー)を使わないで、Wifiを介した遠隔操作でRaspberry Pi Pico Wのソースを変更、実行できるようになります。

この遠隔操作を実現するコードは、このリンク先で紹介したESP32の遠隔制御用のTCPサーバーで 使ったコードを参考に作っています。

Raspberry Pi PicoにはBOOTSELボタンがあり、電源投入時(USB接続時)に押されていると、 ストレージデバイスとして認識され起動用ファイルを入れ替えることができます。
(ストレージデバイスとして認識ている時、私の場合は、RPI-RP2のドライブ名で割り当てられて、F:ドライブとして扱えました。 この時はCOMデバイスが使えませんでした。)
使用した統合開発環境(IDE)のThonny(トニー)では、インストールの後にこの状態(電源投入時にBOOTSELボタンを押して起動した状態)で、 MicroPythonをインストールします。

具体的には左記のThonny(トニー)の画面で、MicroPythonのインストールは、 ツールメニューから「オプション」を選択して「インタプリタ」タブの選択で、 次のようにインストールしています。
右でインストールしたバージョン表示は、
3.4.0; MicroPython v1.22.2 on 2024-02-22 ですが、
その後で次バージョンのファームウェアにアップデートしました。
3.4.0; MicroPython v1.25.0 on 2025-04-15
ですが、このバージョンではマルチキャスト受信ができませんでした。
それで、現時点(2025年5月)では
3.4.0; MicroPython v1.22.2 on 2024-02-22に ダウングレードして戻しています。
以下では、このバージョンで動作確認しています。

私が使っているThonny(トニー)環境ではSTOPボタンを押すと、USB-CDCがCOM4のデバイスでRaspberry Pi Picoと通信し、 Raspberry Pi Picoのファイルが見えるようになり、 それからファイルをダブルクリックで開いて、実行ボタンで動作させような使い方ができます。

Raspberry Pi Pico Wをアクセスポイントにして、PCからの接続実験

このリンク先で紹介したESP32用に作ったsetap.pyを、次の赤文字ように変更して動作を確認しました。
ESP32用に作ったコードでessidに接続する場合はパスワードを無しにしたのですが、Raspberry Pi Pico W で無しの設定が出来なかったので、 以下ではパスワードを'abcd1234'に指定していますが、希望の通りに動作しませんでした。
import time # setap.py
import network
ap = network.WLAN(network.AP_IF) # create access-point interface
ap.config(essid='PicoW-AP', password='abcd1234') # set the ESSID of the access point
ap.ifconfig(('192.168.220.1', '255.255.255.0', '192.168.220.1', '8.8.8.8'))
# set (ip address, subnet mast, default gateway, dns ip address)
ap.active(True)         # activate the interface
上記で、Raspberry Pi Pico WのIPアドレスを'192.168.220.1'にしていますが、 このアクセスポイントに接続してもPCに192.168.220.0のネットワークのIPアドレスが割り振りされませんでした。
(私のPCでは、Wifiに接続後、PCは169.254.24.70リンクローカルアドレスになっていました。 これはDHCP からIPアドレスを取得できなかったときの自動的な割り当てIPです。)

つまり、DHCPサーバーが動作していないということです。
調べた所、 Pico W の MicroPython に内蔵されている簡易DHCPサーバーは、デフォルトのIPアドレス 192.168.4.1 を使用している場合だけ正しく動作するらしいバグが、現時点(2025年5月)であるようです。
最終的に次のコードにして、動作しています。
import time # setap.py
import network
ap = network.WLAN(network.AP_IF) # create access-point interface
ap.config(essid='PicoW-AP', password='abcd1234')
ap.ifconfig(('192.168.4.1', '255.255.255.0', '192.168.4.1', '8.8.8.8'))
ap.active(True)         # activate the interface
while not ap.active():
    time.sleep(1)

遠隔制御用のTCPサーバーが自動的に起動するまでの設定

Raspberry Pi Picoではmain.pyを記述したプログラムが存在すると、usbケーブルで電源投入時に自動でmain.pyを実行するという機能が あります。
そこで、このmain.pyからサーバーのモジュールが起動するように作ります。(importで実行させます)
このリンク先で紹介したESP32用環境では起動用ファイルがboot.pyでしたが、 Pi Picoの起動用ファイルはmain.pyです。
よって、main.pyに上記アクセスポイント設定用のsetap.pyモジュールと、下記のserver.pyモジュールを起動するimportを行います。
server.pyモジュールは、 Pi Pico Wへの任意ファイルアップロード、ダウンロード、 ルートディレクトリのリスト情報出力、任意テキストファイル内容の情報出力、ファイルの削除、pythonモジュールの実行、サーバーの実行終了 をクライアントから要求できるように作っています。
このserver.pyの内容を下記で示します。
import os   # server.py Raspberry Pi Pico W を制御するサーバーこのコード
import sys
import socket
import _thread
import time
#import esp
#esp.osdebug(None) # ベンダ O/S デバッグメッセージをオフにする
from machine import Pin # Raspberry Pi Pico W の GPIOの端子操作用 
sw=Pin( 10, Pin.IN, Pin.PULL_UP ) # GPIO をプルアップ設定にしてタクトスイッチの入力用にする 
sw_value = sw.value() # GPIO10のスイッチ状態取得

path_datas = "./"
flag_listen_loop=True
sock2=None # TCPクライアントと通信するソケット

def tcp_send_message(sock,msg): # sockのクライアント側にメッセージ送信
    bin=('M'+msg+"\r\n").encode("utf-8")
    if sock : sock.sendall(bin)

def send(msg):# sock2のクライアント側にメッセージ送信
    msg=f"{msg}" # 文字列へ変換
    if sock2 is None: return
    a=msg.split("\n")
    for s in a:
        tcp_send_message(sock2,s)

def ls_filelist(sock,path): # ファイルリストの文字列をクライアントに送信
    tcp_send_message(sock,"ls_filelist:" + path)
    if path == "": path="."
    try:
        for f_name in os.listdir(path) :
            file_info = os.stat(f_name)
            file_size = file_info[6]
            d_inf = 'dir' if (file_info[0] & 0x4000 ) else '   ' # ディレクトリ判定
            msg = f"{d_inf} {file_size:7}  {f_name}"
            tcp_send_message(sock, msg)
    except Exception as e:
        tcp_send_message(sock,"ls_filelist Error:" + str(e))

def cat_filepath(sock, filepath): # filepathのテキスト内容をクライアントに送信
    tcp_send_message(sock, "cat_filepath:" + filepath)
    try:
        with open(filepath, 'r', encoding='utf-8') as fr:
            ss = fr.readlines()
        for s in ss: 
            if s[-1] == '\n' : s=s[:-1] # 改行を除く
            tcp_send_message(sock, s)
    except Exception as e:
        tcp_send_message(sock,"cat_filepath Error"+ str(e))    

def del_filepath(sock, filepath): # filepathのファイルを削除
    tcp_send_message(sock, "del_filepath:" + filepath)
    try:
        os.remove(filepath)
    except Exception as e:
        tcp_send_message(sock,"del_filepath Error:"+ str(e))    

def import_name(sock, name):# nameのモジュールのPythonソースを実行
    tcp_send_message(sock, "import_name:" + name)
    try:
        mod=__import__(name) # 動的なインポートで実行
        del sys.modules[name] # 再度インポートできるように開放
    except Exception as e:
        tcp_send_message(sock,"import_name Error:"+ str(e))

def send_file(sock, filepath):# filepathのファイルをクライアントに送信
    try:
        idx_last_sep = filepath.rfind('\\')
        filename = filepath if idx_last_sep == -1 else filepath[idx_last_sep+1:]
        file_info = os.stat(filepath)
        filesize = file_info[6]
        s = "f{} {}".format(filename, filesize)
        bin=(s+"\r\n").encode("utf-8")
        sock.sendall(bin) #send filename filesize
        if filesize > 0:
            with open(filepath, "rb") as f:
                bin = f.read(filesize)
            sock.sendall(bin) # send binary file
        #
        tcp_send_message(sock,"server:send_file:"+ filepath)  
    except Exception as e:
        tcp_send_message(sock,"server:send_file Error:"+ str(e))    

def recieve_file(sock):# TCPでファイルを受信(1byte受信の繰り返し)
    buf=b""
    while True:
        bin = sock.recv(1)
        if bin == b'': break
        buf += bin
        if buf[-2:] == b"\r\n":
            s = buf[0:-2].decode('utf-8')
            a=s.split(' ')
            filename, filesize = a[0], int(a[1])
            f=open( path_datas + filename, "wb") # 受信ファイルの保存
            n=0
            tcp_send_message(sock,f"Starting to receive {filename}:size={filesize} Please wait.")
            while n < filesize:
                bin = sock.recv(1)
                if bin == b'': break
                f.write(bin)
                n += 1
                if n % 10000 == 0: tcp_send_message(sock,f"\n{n}bytes were received.")
            #
            f.close()
            tcp_send_message(sock,f"\n{filename} size:{filesize}, {n}bytes receive end.")
            if filename.endswith(".umh"):
                send_cmdfile(sock,filename) # ファイルのコマンド内容をUMEHOSHI ITAへ送信
            break
        #
    #print(filename + ":size=" + str(filesize)) # debug ---------------

def recieve_file_Max(sock):# TCPでファイルを受信(バッファを最大して受信)
    buf=b""
    while True:
        bin = sock.recv(1)
        if len(bin) != 1: break
        buf += bin
        if buf[-2:] == b"\r\n":
            s = buf[0:-2].decode('utf-8')
            a=s.split(' ')
            filename, filesize = a[0], int(a[1])
            bin=b""
            while len(bin) < filesize:
                bin += sock.recv(filesize-len(bin))
            with open( path_datas + filename, "wb") as f: # 受信ファイルの保存
                f.write(bin)
            tcp_send_message(sock,f"Server:'{filename}' {len(bin)}bytes received.")
            if filename.endswith(".umh"):
                send_cmdfile(sock,filename) # ファイルのコマンド内容をUMEHOSHI ITAへ送信
            break
        #
    #print(filename + ":size=" + str(filesize)) # debug ---------------

def recieve_message(sock): # 受信メッセージに対する処理
    global flag_listen_loop
    buf=b""
    while True:
        bin = sock.recv(1)
        if len(bin) != 1:
            flag_listen_loop = False
            return False
        #           
        buf += bin
        if buf[-2:] == b"\r\n":
            s = buf[0:-2].decode('utf-8').strip()
            if s == "end_listen_loop": # 受信文字列がこれと一致するなら終了
                flag_listen_loop = False
                return False
            #
            if s.startswith("ls -l"): # ファイルリスト要求に対する送信処理
                ls_filelist(sock, s[5:].strip() )
                return True
            #
            if s.startswith("cat "):# ファイル内容表示要求に対する送信処理
                cat_filepath(sock, s[4:] )
                return True
            #
            if s.startswith("del "):# ファイル削除要求に対する処理
                del_filepath(sock, s[4:] )
                return True
            #
            if s.startswith("import "):# Pythonファイルの実行要求に対する処理
                import_name(sock, s[7:] )
                return True
            #
            if s.startswith("get "): # ファイル要求に対するファイル送信
                send_file(sock, s[4:] )
                return True
            #
            if s.startswith("exc "): # .umhファイルの内容で、[UMEHOSHI ITA]のUART1に出力する。
                send_cmdfile(sock, s[4:] )
                return True
            #
            if len(s)>1 and s[0] in "SRG": 
                print( s ) # 受信文字列を、UME専用Hexコマンドと判断して、UMEHOSHI ITAへ送信
            else:
                tcp_send_message(sock,s) # 受信文字列を、TCPクライアントへECHO送信
            #
            return True

def receiveData(sock): # 受信処理ループ
    # tcp_send_message(sock,"start sever receiveData" )
    while True:
        bin = sock.recv(1)#1byte recieve
        if len(bin) != 1: break
        if bin == b"f" : recieve_file(sock)
        elif bin == b"F" : recieve_file_Max(sock)
        elif bin == b"M" : recieve_message(sock)
        else:
            tcp_send_message(sock,"server receive error:" + str(bin) )
    #

# UME用で除いた箇所の部分がここに存在していた。

# ------------- Main -------------------
portnumber=59154
ip = '192.168.4.1'
server_addr =(ip, portnumber)
serversock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
serversock.bind(server_addr)
#time.sleep(3.1) # 3.1秒待機 ([UMEHOSHI ITA]の初期でUME用で不要)
#thread_id = _thread.start_new_thread(input_loop, ()) #UME用で入力用スレッドのUME用で不要
serversock.listen(1) # TCPサーバ受け入れ待機
while flag_listen_loop and sw_value == 1:
    #
    sock2, address = serversock.accept() # クライアントの接続許可
    serversock.close()
    tcp_send_message(sock2, "\nConnect!") # TCPクライアントへのメッセージ
    try: 
        receiveData( sock2 ) # 受信応答処理のループ
    except: pass
    # print(cmd) # ― ― ― ― の音で終了処理で、UME用により不要
    sock2.close()

sock2 = None
参考元のESP32のserver.pyと上記ソースの違いは、上記の赤文字の箇所だけで、問題なく動作しました。
起動時にGPIO10に繋がるスイッチを押すと、サーバとしての待ち受けループをせずにすぐ終了するように変更しています。)

そして、これを電源供給時に自動実行させるための、main.pyを次のように作ります。
import setap
import server
そして、最後に統合開発環境(IDE)のThonny(トニー)で、Raspberry Pi Pico W 内に、上記で示した setap.py、server.py、main.py の3つのファイルを書き込めば出来上りです。
(Thonny内で対象ファイルを右クリックして、アップロードメニュー項目を選択します)
これで、PCのUSB接続を外して、Raspberry Pi Pico W をUSBの電源アダプタに繋げば、クライアントソフトで遠隔操作できるはずです。
(Thonny(トニー)を使わないファイル制御が可能になります)

上記で実行したRaspberry Pi Pico W を遠隔操作するクライアント(client.py)例の紹介と、実行例

Raspberry Pi Pico W の基板に電源を電源アダプタなどに繋いで起動します。
前述の操作が終わっていれば、'PicoW-AP'のSSIDのWifiアクセスポイントが見えるはずで、'abcd1234'のパスワードで接続します。

この状態でRaspberry Pi Pico Wでは、上述のTCPサーバー(server.py)が起動しているはずです。
後は、このサーバーに対応するクライアントソフト利用して使うだけです。
この対応クライアントソフトは、何の言語を使って作っても良いのですが、 簡単に用意できる例として、Pythonで作成したコンソール用のクライアントソフト(client.py)を以下で紹介します。
import os 		# client.py
import socket
import sys
import time
import threading

path_datas="./"

# ----- 受信関連 ---------------------------------
def recieve_file(sock):
    buf=b""
    while True:
        bin = sock.recv(1)
        if len(bin) != 1: break
        buf += bin
        if buf[-2:] == b"\r\n":
            s = buf[0:-2].decode('utf-8')
            a=s.split(' ')
            filename, filesize = a[0], int(a[1])
            bin=b""
            while len(bin) < filesize:
                bin += sock.recv(filesize-len(bin))
            with open( path_datas + filename, "wb") as f:
                f.write(bin)
            #tcp_send_message(sock,filename + ":size=" + str(len(bin)))
            break
    print(filename + ":size=" + str(filesize)) # debug ---------------

def recieve_message(sock): # TCP文字列メッセージ受信
    buf=b""
    while True:
        bin = sock.recv(1)#1byte受信
        if len(bin) != 1: break
        buf += bin
        if buf[-2:] == b"\r\n":# 1行の文字列受信終了?
            s = buf[0:-2].decode('utf-8')#バイナリから
            print(s)
            break

def receiveData(sock): # 全てのTCP受信のスレッド用関数
    #print( "start receiveData" )
    while True:
        try:
            bin = sock.recv(1)#1byte受信
        except Exception as e: break
        if len(bin) != 1: break
        if bin == b"f" : recieve_file(sock)
        elif bin == b"M" : recieve_message(sock)
        else: print(bin , end="")
    #
    print( "receiveData ended." )

# ----- 送信関連 ---------------------------------
def send_file(sock, filepath):
    try:
        filesize = os.path.getsize(filepath)
        filename = os.path.basename(filepath)
        s = "f{} {}".format(filename, filesize)
        bin=(s+"\r\n").encode("utf-8")#binaryへ変換
        # print(bin, end="  ")
        sock.sendall(bin) #filename filesize 送信
        with open(filepath, "rb") as f:
            bin = f.read(filesize)
        # print("送信data:" , bin )
        sock.sendall(bin) # ファイル内容一括送信
        print("送信byte:" , len(bin) )
    except Exception as e:
        print( "send_file error:" + str(e))

def send_file_Max(sock, filepath):
    try:
        filesize = os.path.getsize(filepath)
        filename = os.path.basename(filepath)
        s = "F{} {}".format(filename, filesize)
        bin=(s+"\r\n").encode("utf-8")#binaryへ変換
        # print(bin, end="  ")
        sock.sendall(bin) #filename filesize 送信
        with open(filepath, "rb") as f:
            bin = f.read(filesize)
        # print("送信data:" , bin )
        sock.sendall(bin) # ファイル内容一括送信
        #print("送信byte:" , len(bin) )
    except Exception as e:
        print( "send_file_Max error:" + str(e))

def send_message(sock,msg):
    bin=('M'+msg+"\r\n").encode("utf-8")#binaryへ変換
    sock.sendall(bin)#一括送信
    print(f"Send:{msg}")

# ----- メイン ---------------------------------
portnumber=59154
ip='192.168.4.1'
s = input(f"IP (defualt:{ip})Address>")
if s != "": ip = s
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.connect((ip, portnumber))
#print("接続成功")
t_id = threading.Thread(target=receiveData, args=(sock,) )
t_id.start()
time.sleep(1.0)
loopFlag = True
cmd=""
while loopFlag:
    #s = input("入力:M/F/'quit'/w/a/s/d/b/?>")
    s = input("入力:M/F/'quit'?>")
    if s != "": cmd = s
    if cmd == 'M':
        # msg = input("UME HEX Command /exc /ls -l/cat /get /del /import >")
        msg = input("ls -l/cat /get /del /import >")
        send_message(sock,msg)
        cmd=' '
    elif cmd == 'f' or cmd == 'F':
        s='F'
        print(os.listdir(path_datas))
        filename = input("送信したいファイル名入力>")
        if not os.path.isfile(path_datas + "/" + filename):
            print( filename, "no exist")
            continue
        if cmd == 'f' : send_file(sock, path_datas + "/" + filename)
        if cmd == 'F' : send_file_Max(sock, path_datas + "/" + filename)
        cmd=' '
    elif cmd == 'quit':
        send_message(sock,'end_listen_loop')
        loopFlag=False
    else:
        pass # 追加コマンド処理
    #
    time.sleep(1.0)

sock.close()
sys.exit(0)

上記コード実行例(使い方を示しています。)

上記 client.py のソースは、このリンク先で紹介したESP32用のコードと殆ど同じで赤の箇所だけ異なっています。
よって、操作はこのリンク先で紹介した使い方と同じ操作ができます。使い方はこのリンクを参照してください。

なおこのWifiを使った遠隔操作と、統合開発環境(IDE)のThonny(トニー)の併用は出来ないので、ご注意ください。
上記クライアント(client.py)の起動後に、Thonnyを起動すると、両方で実行エラーが起きsever.pyの動作は停止します。
その後、Thonnyの環境内からPico W内のsever.py を起動できます。
そして、再び python client.py の起動で いくつかのファイル操作が出来ますが、import でモジュールを実行させようとすると、
import_name Error:[Errno 98] EADDRINUSE のエラーが起きます。
自動起動のsever.pyが止まっていれば、Thonnyの環境内からファイルを起動する操作は可能です。

最後に使用状況や空状態の表示と使えるメモリ容量を表示させるソースとその実行例の一部を紹介します。
ファイルは df.py の名前で、次の内容で作り、それをWifi経由で転送、実行させます。
import server
print = server.send # printをクライアントへ出力する機能に置き換える
import os
import gc

stat = os.statvfs('/')
block_size = stat[0]  # 1ブロックのサイズ(バイト単位)
total_blocks = stat[2]  # f_blocks(総ブロック数)
free_blocks = stat[3] # 空きブロック数
print(f"block_size: {block_size} bytes, free_blocks: {free_blocks}")
free_space=block_size * free_blocks  # 空き容量(バイト単位)
print(f"Free space: {free_space} bytes")
total_size = block_size * total_blocks  # 総容量(バイト)
print(f"Total Storage: {total_size} bytes\n")

print(f"Allocatable memory size:{gc.mem_free()}") # 今どのくらいヒープが空いているかのサイズ(byte)確認

以下は、この実行例です。上記の df.py をアップロードして、それを実行しています。    が実行結果の部分です。
D:\PicoW\mywork>python client.py
IP (defualt:192.168.220.1)Address>

Connect!
入力:M/F/'quit'?>F
['main.py', 'client.py', 'df.py', 'server.py', 'setap.py', 'test.py']
送信したいファイル名入力>df.py
Server:'df.py' 760bytes received.
入力:M/F/'quit'?>M
ls -l/cat /get /del /import >import df
Send:import df
import_name:df
block_size: 4096 bytes, free_blocks: 190
Free space: 778240 bytes
Total Storage: 868352 bytes

Allocatable memory size:182224
入力:M/F/'quit'?>quit
Send:end_listen_loop
receiveData ended.

D:\PicoW\mywork>

同じソースをESP32-DevKitC-32Dでも実行しています。
比較すると、ESP32-DevKitC-32DがTotal Storage: 2097152 bytesで、実験で使ったPico Wの方が少ないようです。
対して、Allocatable memory sizeはPico Wの方が多い結果が得られました。