家庭菜園ロガーの製作:ソフトウェア(マイコン側)

開発環境

 Raspberry Pi Pico W のソフトウェア開発環境は、主に3つある。

 最初、今までマイコンのプログラミングで使い慣れた Arduino IDE で開発を始めたのだが、消費電力を抑えるためのディープスリープを実装しようとして、行き詰まってしまった。現状 (2024/03時点) では、Arduino 環境でディープスリープを実装するのは結構大変らしい。自分でArduino coreを再ビルドする必要がある。(参考: GitHub - earlephilhower/arduino-pico - Low Power Modes #345

 「何でもできる」という点では C/C++ SDK が一番だが、少しハードルが高いと感じた。今回は MicroPython を使うことにした。手順は下の通り。

 Lチカをやってみる。下のスクリプトをコード部分に書き込む。

import machine,utime
led = machine.Pin('LED', machine.Pin.OUT)
while True:
    led.value(1)
    utime.sleep(0.5)
    led.value(0)
    utime.sleep(0.5)

 「実行」ボタンを押すと、スクリプトが実行される。ボード上の緑色LEDが点滅する。

 PC と接続せずに、単体でスクリプトを実行させるには、Pico W の本体に main.py という名称でスクリプトを保存しておく。Pico W が接続されている状態で(スクリプトを実行している時は STOP ボタンで止めてから)、Thonny から「名前をつけて保存」する。下のような表示が出る。"Raspberry Pi Pico W" を選ぶ。

 現在保存されているファイルが表示される(下の図はまだ何も保存していない状態)。ファイル名を main.py として保存する。

土壌水分センサー

 土壌水分センサーは、単に ADC を読むだけ。電源を供給するため、GPIO18 を操作する。

from machine import ADC, Pin
import time
p1_vcc = Pin(18, Pin.OUT)
p1_vcc.on()
adc0 = ADC(Pin(26))
adc1 = ADC(Pin(27))
while True:
    time.sleep(2)
    a0 = adc0.read_u16()//64
    a1 = adc1.read_u16()//64
    print('{} {}'.format(a0, a1))

温湿度センサー

 温湿度センサーの DHT22 は、MicroPython でサポートされている。参考:「Raspberry Pi Pico: DHT11/DHT22 Temperature and Humidity Sensor (MicroPython)」(Random Nerd Tutorials, 2023/09/12)

 コードは非常に簡単。読むのに失敗することが時々あるようなので、読み取り部分を try...except で囲んでおく必要がある。

from machine import Pin
from time import sleep
import dht 
sensor = dht.DHT22(Pin(20))
while True:
  try:
    sleep(2)
    sensor.measure()
    temp = sensor.temperature()
    hum = sensor.humidity()
    print('{:3.1f}°C {:3.1f}%'.format(temp, hum))
  except OSError as e:
    print('Failed to read sensor.')

電源電圧

 Raspberry Pi Pico W では、VSYS を 1/3 に抵抗分圧したものが ADC3 につながっている。つまり、ADC3 を読み取れば電源電圧のモニタリングができる。ただ、Pico W ではこの値の取得にクセがあって、要注意。

 Pico W では、ADC3 の線が無線通信モジュールのクロックと共用されており、GPIO25 でそれを切り替えるようになっている(GPIO25 = high なら ADC3, low なら通信モジュール)。また、Pico W では、基板上の LED が通信モジュールにつながっているので、基板上の LED を使う設定にすると、同時に通信モジュールが ON になってしまう。そこで、起動後、無線通信とLEDを使う「前」に、GPIO25を1にして、GPIO29 (=ADC3) を入力モードにしてADC3から値を読めばよい。

#  Read VSYS/3 from ADC3
from machine import ADC, Pin
import time
Pin(25, Pin.OUT).high()
Pin(29, Pin.IN)
adc3 = ADC(3)
time.sleep(0.5)
adc3_value = adc3.read_u16() * 3 * 3.3 / 65535

無線LANへの接続

 取得したデータをサーバへ送信するため、無線LANに接続する。Pico W のプログラム例を見ると、ソケットを開いて send(), recv() でデータ通信を明示的に書いている例が多いが、Python の urequests というクラスを使う方が簡単に書ける。

 無線LANに接続する前に、無線通信モジュールをオンにする必要がある。これは GPIO23 を High にすることで行う。

import network
import urequests
import time
#  Network setup
ssid = "SSID"
password = "password"
server_address = "nnn.nnn.nnn.nnn"
port = 80
base_url = "http://{}:{}/..../".format(server_address, port)
url = base_url + "cgi-bin/post.py"
#  Connect to WiFi
Pin(23, Pin.OUT).high()
wlan = network.WLAN(network.STA_IF)
wlan.active(True)
print("Connecting to WiFi...")
wlan.connect(ssid, password)
while not wlan.isconnected():
    time.sleep(5)
    print("Trying to connect...")
print("WiFi connected as {}".format(wlan.ifconfig()[0]))
#  Post data to server
qstr = "a0={}&a1={}&a2={:.2f}&temp={:.1f}&humid={:.1f}".format(a0, a1, adc3_value, temp, hum)
response = urequests.get(cgi_url + "?" + qstr)
if response.status_code == 200:
    print("post.py succeeded.")
else:
    print("post.py failed, status: {}".format(response.status_code))
#  Disconnect from WiFi
wlan.disconnect()
while wlan.isconnected():
    time.sleep(1)
print("WiFi disconnected.")

自動アップデート

 サーバに新しいバージョンのスクリプトがある時、それをダウンロードして main.py として保存する。リブート後、新しいバージョンのスクリプトで動作させることができる。

 下のコード例では、サーバに main.pyversion.json を置いておく。サーバからまず version.json をダウンロードして、Pico W 内部に保存されている version.json と比較し、バージョン番号が大きくなっていれば、main.py をダウンロードして、内部に保存し、machine.reset() でリブートする。

def auto_update():
    #  Fetch the version info
    response = urequests.get(version_url)
    if response.status_code != 200:
        print("Cannot fetch the version info: status {}".format(response.status_code))
    else:
        print("Version info has been successfully fetched: {}".format(response.text))
        version_text = response.text
        version = json.loads(version_text)['version']
        print("Latest version is {}".format(version))
        current_version = None
        try:
            with open('version.json') as f:
                current_version = json.load(f)['version']
        except:
            pass
        print("Current version is {}".format(current_version))
        if current_version == None or current_version < version:
            response = urequests.get(firmware_url)
            if response.status_code == 200:
                print("New firmware has been successfully fetched (length {})".format(len(response.text)))
                latest_code = response.text
                #  Save the current version
                try:
                    print("Saving the new firmware...")
                    with open('latest_code.py', 'w') as f:
                        f.write(latest_code)
                    print("Saving the new version...")
                    with open('version.json', 'w') as f:
                        json.dump({'version': version}, f)
                    #  Overwrite the old code
                    print("Overwriting the old firmware...")
                    os.rename('latest_code.py', 'main.py')
                except:
                    print("Error occurred during the auto-update.")
                    pass
                print("Restarting device... (don't worry about an error message after this")
                time.sleep(0.25)
                machine.reset()
            else:
                print("Cannot fetch the latest firmware: status {}".format(response.status_code))
    del response

 当然のことながら、アップデート後の main.py がエラーを起こすと、何もできなくなる。この時は仕方がないので、本体を PC につないで、アップデートし直す。自動アップデートの前に、動作チェックをしておく必要がある。(今手元に Pico W が1枚しかないので、事前チェックができないんだよね……もう1枚買わなきゃ。)

ディープスリープ

 MicroPython では、ディープスリープは machine.deepsleep(ミリ秒) で可能。ディープスリープから起きるときはリセットがかかり、ラズピコ上のスクリプトが先頭から実行される。

 Thornyで開発中は、ラズピコ上のスクリプトを更新せずに実行している時があるので、ディープスリープ後に古いスクリプトが実行されて「???」となることがある。実行前に必ずラズピコ上の main.py を更新するように、注意すること。

 ディープスリープの目的は消費電流を小さくすることだが、注意点がいくつかある。まず、ディープスリープ中、GPIO の出力は維持される。つまり、土壌水分センサーの電源を制御している GPIO18 は、ディープスリープに入る前に Low にしないといけない。また、無線通信モジュールの電源をオフにするため、GPIO23 も Low にする必要がある。

 無線LANで温湿度・土壌水分・電源電圧のデータをサーバに送り、その後 60 秒間ディープスリープする、というスクリプトで、消費電流を実測してみた。

 ディープスリープ中の電流値はほぼ 2 mA。先行例(「ディープスリープでRaspberry Pi Pico Wを低電力化する」、さとやまノート 2023/04/06)でも同程度だったので、良しとする。

 3サイクル分の平均値をとると、9.6 mA だった。1サイクルあたりの時間は68.3秒だった。実際の運用は待機時間を5分にする。待機中の電流を 2.0 mAとすると、待機時間を5分にしたときの平均電流は(9.6*68.3+2.0*240)/(68.3+240)=3.7 mAとなる。

全ソースコード

 上で解説していない部分もありますが、ソースを解読してください。

import network
import urequests
import socket
import json
import time
import os
import gc
import dht 
from machine import ADC, Pin
from machine import mem32

#  Read VSYS/3 from ADC3
Pin(25, Pin.OUT).high()
Pin(29, Pin.IN)
adc3 = ADC(3)
time.sleep(0.5)
adc3_value = adc3.read_u16() * 3 * 3.3 / 65535

#  Network setup
ssid = "SSID"
password = "password"
server_address = "nnn.nnn.nnn.nnn"
url = "/cgi-bin/post.py"
port = 80
base_url = "http://{}:{}/iot/".format(server_address, port)
cgi_url = base_url + "cgi-bin/"
channel = 1
update_url = base_url + str(channel) + "/"
version_url = update_url + "version.json"
firmware_url = update_url + "main.py"

#  Temperature/humidity sensor setup
sensor = dht.DHT22(Pin(20))

#  Soil moisture setup
p1_vcc = Pin(18, Pin.OUT)
adc0 = ADC(Pin(26))
adc1 = ADC(Pin(27))

#  LED pin setup
led = machine.Pin('LED', machine.Pin.OUT)

try_update = True

def auto_update():
    #  Fetch the version info
    response = urequests.get(version_url)
    if response.status_code != 200:
        print("Cannot fetch the version info: status {}".format(response.status_code))
    else:
        print("Version info has been successfully fetched: {}".format(response.text))
        version_text = response.text
        version = json.loads(version_text)['version']
        print("Latest version is {}".format(version))
        current_version = None
        try:
            with open('version.json') as f:
                current_version = json.load(f)['version']
        except:
            pass
        print("Current version is {}".format(current_version))
        if current_version == None or current_version < version:
            response = urequests.get(firmware_url)
            if response.status_code == 200:
                print("New firmware has been successfully fetched (length {})".format(len(response.text)))
                latest_code = response.text
                #  Save the current version
                try:
                    print("Saving the new firmware...")
                    with open('latest_code.py', 'w') as f:
                        f.write(latest_code)
                    print("Saving the new version...")
                    with open('version.json', 'w') as f:
                        json.dump({'version': version}, f)
                    #  Overwrite the old code
                    print("Overwriting the old firmware...")
                    os.rename('latest_code.py', 'main.py')
                except:
                    print("Error occurred during the auto-update.")
                    pass
                response = urequests.get(cgi_url + "post.py?comment=Firmware%20updated:%20{}%20->%20{}".format(current_version, version))
                print("Restarting device... (don't worry about an error message after this")
                time.sleep(0.25)
                machine.reset()
            else:
                print("Cannot fetch the latest firmware: status {}".format(response.status_code))
    del response

while True:

    led.value(1)
    p1_vcc.on()

    #  Measure temperature and humidity
    retry = True
    count = 0
    temp = -10.0
    hum = 0
    while retry:
        try:
            sensor.measure()
            temp = sensor.temperature()
            hum = sensor.humidity()
            retry = False
        except OSError as e:
            count += 1
            if count >= 10:
                temp = -10.0
                hum = 0
                break
            time.sleep(1)
    
    #  Connect to WiFi
    Pin(23, Pin.OUT).high()
    wlan = network.WLAN(network.STA_IF)
    wlan.active(True)
    print("Connecting to WiFi...")
    wlan.connect(ssid, password)
    count = 0
    while not wlan.isconnected():
        time.sleep(5)
        print("Trying to connect...")
        count += 1
        if count >= 24:
            break
    led.value(0)
    if count >= 24:
        print("Cannot connect to Wifi. Sleep for a while, and retry.")
        machine.deepsleep(5*60000)
        continue   #  This line will never executed

    ip = wlan.ifconfig()[0]
    print("Connected as {}".format(ip))
    
    #  Try auto update
    if try_update:
        auto_update()
        try_update = False

    #  Measure soil moisture
    #  (This is done after connecting to WiFi, because it is better to wait after 
    #  turning on the Vcc of the sensor)
    a0 = adc0.read_u16() // 64
    a1 = adc1.read_u16() // 64
    
    print("a0:{} a1:{} a2:{:.2f} temp:{:.1f}°C humid:{:.1f}% channel:{}".format(a0, a1, bat_v, temp, hum, channel))
    qstr = "?a0={}&a1={}&a2={:.2f}&temp={:.1f}&humid={:.1f}&ch={}".format(a0, a1, adc3_value, temp, hum, channel)

    response = urequests.get(cgi_url + "post.py" + qstr)
    if response.status_code == 200:
        print("post.py succeeded.")
    else:
        print("post.py failed, status: {}".format(response.status_code))
    wlan.disconnect()
    while wlan.isconnected():
        time.sleep(1)
    print("WiFi disconnected.")
    wlan.active(False)
    p1_vcc.off()
    
    del wlan, ip, qstr, response
    gc.collect()

    Pin(23, Pin.OUT).low()
    Pin(25, Pin.OUT).low()
    machine.deepsleep(5*60000)