次のように、python3の動作環境のみでWebサーバ起動できます。この動作は、静的なフHTMLァイルに限定されます。
python -m http.server --bind 127.0.0.1 80
(-m オプションで後述のライブラリモジュールをスクリプトとして実行させています。終了は「Ctrl+C」です。)
次のスクリプト「webtest1.py」を別途に作って動作させた場合とほぼ同じ動作です。
from http.server import HTTPServer, SimpleHTTPRequestHandler host, port= '127.0.0.1', 80 httpd = HTTPServer((host, port), SimpleHTTPRequestHandler) httpd.serve_forever()GET や POST リクエストに対するレスポンスをカスタマイズするには http.server.SimpleHTTPRequestHandler を継承したクラスで do_GET や do_POST のメソッドをオーバーライドします。
from http.server import HTTPServer, SimpleHTTPRequestHandler
class MyHandler(SimpleHTTPRequestHandler):
def do_GET(self):
body = b'Hello Python HTML World'
self.send_response(200)
self.send_header('Content-type', 'text/html; charset=utf-8')
self.send_header('Content-length', len(body))
self.end_headers()
self.wfile.write(body)
host, port= '127.0.0.1', 80
httpd = HTTPServer((host, port), MyHandler)
httpd.serve_forever()
また、WSGI(Web Server Gateway Interface) をサポートする HTTP サーバーの場合は、
をつくる場合、wsgiref モジュール(wsgiref.simple_server)を使います。
# -*- coding: utf-8 -*-
# python webserver.py
import http.server
from http.server import HTTPServer, CGIHTTPRequestHandler
class Handler(CGIHTTPRequestHandler):
cgi_directories = ["/cgi-bin"]
server_address = ("", 80)
handler_class = Handler #ハンドラを設定
my_server = HTTPServer(server_address, handler_class)
my_server.serve_forever()
これは、後述するPython web アプリを起動できます。
# -*- coding: utf-8 -*-
# app1.py の名前でファイルで作成
import cgi
import os
import sys
import io
sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding='utf-8')
# HTML文字列
html = '''
<!DOCTYPE html>
<html>
<head>
<style>
body {{ font-size: 24pt }} /* 1つ波カッコは2つ並べて表現 */
</style>
<meta charset="UTF-8">
</head>
<body>
{}
<hr>
{}
<form action="app1.py">
お名前:<input type="text" name='name'>
<input type="submit">
</form>
</body>
</html>
'''
# フォームから値を取得
name = '未入力'
form = cgi.FieldStorage()
if 'name' in form:
name = form['name'].value
result = 'お名前:{}<br />'.format(name)
# 結果を表示
print(html.format(
'リクエストは {} です。'.format(os.environ['REQUEST_METHOD']),
result,
)
# -*- coding: utf-8 -*-
# <meta charset="UTF-8"> mysession.py
from collections import defaultdict
import dill
import os
import time
import re #正規表現のモジュール
set_cookie_SESSID=None # Set-Coolie用文字列
# sessIDの辞書取得して戻す。
def getSession( sessID ):
try:
with open(sessID, 'rb') as f:
sessDic = dill.load(f) # ロードして変数に復元
return sessDic
except: return defaultdict(str) # strの型のデフォルト値の空の辞書を返す
def saveSession( sessID, sessDic ):
with open(sessID, 'wb') as f:
dill.dump(sessDic, f)
# セッションの辞書取得
# 'SESSID'のクッキーキーの値(セッションID)があれば、そのファイルより辞書を取得する。
# 無ければ、現在時間からセッションIDを生成し、「mysession.set_cookie_SESSID」に、
# セッション書き込み文字列を設定する。
def getSessDic( usrSESSID = None):
sessID = None
if 'HTTP_COOKIE' in os.environ.keys(): # クッキーがある?
cookie=os.environ['HTTP_COOKIE'] # 取得例: 'AB=12,SESSID=15466.8484,IP=127.0.0.1'
m=re.match('.*(SESSID=)([^,]*)', cookie)
if m and m.lastindex >= 2 :
sessID = m.group(2)
if not sessID : # SESSIDのクッキーが無い?
global set_cookie_SESSID
sessID = str(time.time()) # 現在時間の文字列を取得(例:1545015666.8494184)
if usrSESSID:
sessID = usrSESSID
set_cookie_SESSID = 'Set-Cookie: SESSID=' + sessID
sessDic = getSession(sessID) # sessIDのファイルから辞書生成
return sessDic, sessID
def main():
sessDic=getSessDic()
sessDic=getSession( "123456789.12345" )
print(sessDic['AIの応答'])
sessDic['AIの応答'] = 'こんばんは'
saveSession( "123456789.12345", sessDic )
if __name__ == '__main__':
main()
上記「mysession.py」を置いた場所あるwebアプリのコード例
# -*- coding: utf-8 -*-
# <meta charset="UTF-8">
import io
import os
import sys
import cgi
import cgitb
cgitb.enable() # エラー箇所を指摘させる設定。
# HTML文字列
html = '''
<!DOCTYPE html>
<html>
<head>
<meta http-equiv="Content-Type" content="text/html; charset=UTF-8">
</head>
<body>
<h1><script>
document.write("Cookie:" + document.cookie )
</script></h1>
<pre>
{}
</pre>
<form action="test.py" method="POST">
<input type="submit" name="AAA" value="押して">
</form>
</body>
</html>
'''
import mysession
sessDic,sessID = mysession.getSessDic() # セッションの辞書取得
sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding='utf-8')
form = cgi. FieldStorage()
if not sessDic['閲覧回数']:
sessDic['閲覧回数'] = "1"
else:
sessDic['閲覧回数'] = str(int(sessDic['閲覧回数']) + 1)
msg = "[<br>"
for param in os.environ.keys():
msg += "<b>%-20s</b>: %s<br>" % (param, os.environ[param])
msg += '閲覧回数:' + sessDic['閲覧回数'] + "<br>"
msg += "<br>]<br>\n"
# 結果を表示
# HTTPのレスポンスヘッダー出力
print('Content-type: text/html')
if mysession.set_cookie_SESSID : # セッションが無ければ(セッション書き込み文字列があれば?)
print( mysession.set_cookie_SESSID )
print('Set-Cookie: BB=bb')
print('Server: myServer')
print() # 空の改行
print( html.format( msg ) )# HTTPのレスポンスのボディ出力
mysession.saveSession( sessID, sessDic ) # セッション用ファイルに保存
これを実現するために必要なデスクトップ画面そ取得して、"scr.png"に保存するコード例を示す。
import pyautogui as pag # デスクトップスクリーンを制御するモジュール
import numpy as np
import cv2
img=pag.screenshot()# デスクトップ画像取得
img=np.asarray(img)
img=cv2.cvtColor(img,cv2.COLOR_RGB2BGR)
cv2.imwrite("scr.png", img)
このようにして作った"scr.png"を2秒ごとに再表示する次のHTML「index.html」を用意します。
<!DOCTYPE html>
<html>
<head>
<meta charset="utf-8" />
<meta name="viewport" content="width=device-width">
<meta http-equiv="refresh" content="2; URL=">
<style>
body {
font-size: large;
background-color:aquamarine
}
</style>
<title>Image distribution</title>
</head>
<body>
<img src="scr.png">
</body>
</html>
これを利用すれば、Webサーバ側でデスクトップの画像を "scr.png"の名前で保存する繰り返しを行えば
デスクトップを配信する構造ができます。
# -*- coding: utf-8 -*-
'''
単にWebサーバを起動させ、同時にデスクトップの画像を1秒ごとに "scr.png"の名前で保存する別スレッドを起動している。
別途で、"scr.png"の表示用index.htmlを用意することで、デスクトップ画面を配信するサーバとなる。
'''
import http.server
from http.server import HTTPServer, CGIHTTPRequestHandler
from socketserver import ThreadingMixIn
class Handler(CGIHTTPRequestHandler):
cgi_directories = ["/cgi-bin"]
class ThreadedHTTPServer(ThreadingMixIn, HTTPServer):
pass
import pyautogui as pag # デスクトップスクリーンを制御するモジュール
import threading
import numpy as np
import cv2
import time
def capture():#
count=0
while True:
try:
img=pag.screenshot() # デスクトップ画像取得
img=np.asarray(img)
img=cv2.cvtColor(img,cv2.COLOR_RGB2BGR)
cv2.imwrite("scr.png", img)
count+=1
print(count,"seconds")
except Exception as e:
print(e)
#
time.sleep(1)
#
print("終了")
t_id = threading.Thread(target=capture)
t_id.start()
server_address = ("", 3000)
handler_class = Handler #ハンドラを設定
my_webserver = ThreadedHTTPServer(server_address, handler_class)
my_webserver.serve_forever()
# MySrcTcpServer.py
import socket
import threading
import numpy as np
import cv2
import pyautogui as pag # デスクトップスクリーンを制御するモジュール
server_ip = "127.0.0.1"
server_port = 80
listen_num = 5
tcp_server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
print(type(tcp_server), "のソケットオブジェクトの作成")
tcp_server.bind((server_ip, server_port))
print("ソケットにIPアドレス:{}ポート:{}を紐づける".
format(server_ip,server_port))
tcp_server.listen(listen_num)
print("listenで接続可能状態へ移行")
def rec_send(client,head,body):
client.send(head) # 送信
lst=np.frombuffer( head , dtype="int16")#2バイト、リトルエンディアンで変換
print("-------------", lst)
client.send(body) # 送信
client.close()
while True:
img=pag.screenshot()
img=np.asarray(img)
img=cv2.cvtColor(img,cv2.COLOR_RGB2BGR)
cv2.imwrite("scr.png", img)
print( img.shape )
a=np.array(img.shape, dtype="int16")
head=b''.join( a ) # bytes型(文字列に似た操作が可能)
size=img.shape[0]*img.shape[1]*img.shape[2]
body=img.reshape(size,dtype="int16")
body=body.tobytes()
client,address = tcp_server.accept()
print("クライアント接続許可:",type(client), address)
t_id = threading.Thread(target=rec_send, args=(client, head, body))
t_id.start()
# MyWebServer
#このコード内に埋め込んである所定のHTMLだけを配信するWebサーバ
import socket
import threading
server_ip = "127.0.0.1"
server_port = 80
listen_num = 30
tcp_server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
print(type(tcp_server), "のソケットオブジェクトの作成")
tcp_server.bind((server_ip, server_port))
print("ソケットにIPアドレス:{}ポート:{}を紐づける".
format(server_ip,server_port))
tcp_server.listen(listen_num)
print("listenで接続可能状態へ移行")
def rec_send(args):
print("-------------------", type( args ) )
client = args
head = ""
while True:
data = client.recv(1)
head +=data.decode('utf-8')
idx = head.find("\r\n")
if head == "\r\n": break
if idx != -1 :
print( head , end="")
head = ""
#
html='''<html>
<body>
<h1>TEST</h1>
</body>
<html>
'''
send_head='''HTTP/1.1 200 OK
Content-Type: text/html
Accept-Ranges: bytes
Content-Length: {}
'''
body = html.encode('utf-8') # byteに変換
send_head=send_head.format(len(body))
send_head = send_head.encode('utf-8') # byteに変換
client.send(send_head) # 送信
print("送信")
print(send_head.decode('utf-8'))
client.send(body) # 送信
client.close()
while True:
client,address = tcp_server.accept()
print("クライアント接続許可:",type(client), address)
t_id = threading.Thread(target=rec_send, args=(client,))
t_id.start()