Skip to content

Cooperative Tasks

This guide explains the Task pattern used throughout raspbot for non-blocking, time-driven control loops on the Raspberry Pi.


The problem with time.sleep()

The naive way to do "run this every 500 ms" is:

1
2
3
while True:
    bot.leds.set_all(LedColor.GREEN)
    time.sleep(0.5)

This works for a single action -- but what if you need to read the ultrasonic sensor every 100 ms and blink the LEDs every 500 ms and check the button every 50 ms -- all at the same time?

Nested sleep() calls quickly become a scheduling nightmare, and you cannot respond to anything during a sleep.


The rate-gate pattern

The solution is borrowed from Arduino's millis() pattern:

1
2
3
4
ct = time.monotonic()
if ct - previous_time >= rate:
    previous_time = ct
    do_work()

Instead of sleeping, you check elapsed time on every iteration of a fast main loop. The main loop sleeps for only 1 ms to yield the CPU, but checks all tasks every iteration:

1
2
3
4
5
6
7
8
9
while True:
    ct = time.monotonic()
    if ct - t1 >= 0.1:
        t1 = ct
        read_sensor()
    if ct - t2 >= 0.5:
        t2 = ct
        blink_leds()
    time.sleep(0.001)

Task encapsulates the pattern

Task wraps a single function with its own rate and timer:

1
2
3
4
5
6
from raspbot.utils.task import Task

def blink(ct: float) -> None:
    bot.leds.set_all(LedColor.GREEN)

task_blink = Task(blink, rate=0.5)

Call it in the loop -- it self-throttles:

1
2
3
4
while True:
    ct = time.monotonic()
    task_blink(ct)          # only actually runs every 0.5 s
    time.sleep(0.001)

Decorator factory

Task.every() is a decorator that creates the Task in place:

1
2
3
4
5
6
7
8
@Task.every(0.5)
def task_blink(ct: float) -> None:
    bot.leds.set_all(LedColor.GREEN)

@Task.every(0.1)
def task_sense(ct: float) -> None:
    global distance
    distance = bot.ultrasonic.read_cm()

After decoration task_blink and task_sense are Task instances, ready to call.


Full example -- multiple concurrent tasks

import time
from raspbot import Robot, Task
from raspbot.types import LedColor

with Robot() as bot:
    bot.ultrasonic.enable()
    distance = 0.0
    led_idx = 0

    @Task.every(0.1)
    def read_sensor(ct: float) -> None:
        global distance
        distance = bot.ultrasonic.read_cm()

    @Task.every(0.5)
    def cycle_leds(ct: float) -> None:
        global led_idx
        colors = list(LedColor)
        bot.leds.set_all(colors[led_idx % len(colors)])
        led_idx += 1

    @Task.every(0.2)
    def check_button(ct: float) -> None:
        if bot.button.is_pressed():
            bot.buzzer.beep(0.05)

    @Task.every(1.0)
    def print_status(ct: float) -> None:
        print(f"dist={distance:.1f}cm")

    end = time.monotonic() + 30.0
    while time.monotonic() < end:
        ct = time.monotonic()
        read_sensor(ct)
        cycle_leds(ct)
        check_button(ct)
        print_status(ct)
        time.sleep(0.001)

Key properties of Task

run_immediately

By default (run_immediately=True) the task runs on the very first call. Set run_immediately=False to wait for one full rate period first:

task = Task(fn, rate=1.0, run_immediately=False)

Dynamic rate

Change the rate at runtime:

task.rate = 0.1   # speed up
task.rate = 2.0   # slow down

Reset

Force the task to fire on the next call:

task.reset()

Timing accuracy

Task uses time.monotonic() which is not affected by wall-clock adjustments. Timing accuracy is limited by:

  1. The time.sleep(0.001) at the bottom of the loop (1 ms granularity)
  2. I2C bus latency for sensor reads (~1-3 ms per transaction)
  3. Python GIL contention if you use threads alongside the loop

For typical robot control rates (10-50 Hz) the accuracy is more than sufficient.


Buzzer and LightEffects in the tick loop

Both Buzzer and LightEffects follow the same cooperative pattern. They have their own internal state machines driven by update(ct) -- you never call time.sleep() inside them.

Buzzer

Schedule work with beep() or pattern(); advance state with update():

import time
from raspbot import Robot

with Robot() as bot:
    # Schedule 3 short beeps -- returns immediately
    bot.buzzer.pattern(on_time=0.1, off_time=0.1, count=3)

    while bot.buzzer.is_active:
        bot.buzzer.update(time.monotonic())
        time.sleep(0.001)

LightEffects

Start an effect with start_*(), advance it with update(), stop with stop():

import time
from raspbot import Robot
from raspbot.types import LedColor

with Robot() as bot:
    bot.light_effects.start_breathing(LedColor.CYAN, speed=0.01)

    end = time.monotonic() + 10.0
    while time.monotonic() < end:
        bot.light_effects.update(time.monotonic())
        time.sleep(0.001)

    bot.light_effects.stop()

Combined loop

Drive multiple state machines from the same ct:

import time
from raspbot import Robot
from raspbot.types import LedColor
from raspbot.utils.task import Task

with Robot() as bot:
    bot.light_effects.start_river(speed=0.05)

    # Alert beep every 5 seconds
    def alert(ct: float) -> None:
        bot.buzzer.pattern(0.05, 0.05, 2)

    task_alert = Task(alert, rate=5.0)

    end = time.monotonic() + 30.0
    while time.monotonic() < end:
        ct = time.monotonic()
        bot.buzzer.update(ct)
        bot.light_effects.update(ct)
        task_alert(ct)
        time.sleep(0.001)

    bot.light_effects.stop()
    bot.buzzer.off()

See also