Friday, October 07, 2005
Protothreads
Protothreads are extremely lightweight stackless threads designed for severely memory constrained systems, such as small embedded systems or wireless sensor network nodes. Protothreads provide linear code execution for event-driven systems implemented in C. Protothreads can be used with or without an underlying operating system.
#include "pt.h"Example protothreads code.
struct pt pt;
struct timer timer;
PT_THREAD(example(struct pt *pt))
{
PT_BEGIN(pt);
while(1) {
if(initiate_io()) {
timer_start(&timer);
PT_WAIT_UNTIL(pt,
io_completed() ||
timer_expired(&timer));
read_data();
}
}
PT_END(pt);
}
Protothreads provide a blocking context on top of an event-driven system, without the overhead of per-thread stacks. The purpose of protothreads is to implement sequential flow of control without complex state machines or full multi-threading.
While protothreads originally were created for memory-constrained embedded systems, it has found many uses as a general purpose library too. Examples include multimedia streaming server software, grid computing research software, and MPEG decoding software for Internet TVs.
Main features:
- Very small RAM overhead - only two bytes per protothread and no extra stacks
- Highly portable - the protothreads library is 100% pure C and no architecture specific assembly code
- Can be used with or without an OS
- Provides blocking wait without full multi-threading or stack-switching
- Freely available under a BSD-like open source license
Example applications:
- Memory constrained systems
- Event-driven protocol stacks
- Small embedded systems
- Sensor network nodes
- Portable C applications
"Many of us use the switch/case construct to explicitly implement concurrent state machines in our code. The [protothread] macros merely provide a level of abstraction above that so that the code appears more linear and the overall logic more visible." Dan Henry summarizes how protothreads work and how they are used in a discussion about protothreads in the 8052 message board at www.8052.com.
Protothreads are a extremely lightweight, stackless threads that provides a blocking context on top of an event-driven system, without the overhead of per-thread stacks. The purpose of protothreads is to implement sequential flow of control without using complex state machines or full multi-threading. Protothreads provides conditional blocking inside a C function.
In memory constrained systems, such as deeply embedded systems, traditional multi-threading may have a too large memory overhead. In traditional multi-threading, each thread requires its own stack, that typically is over-provisioned. The stacks may use large parts of the available memory.
The main advantage of protothreads over ordinary threads is that protothreads are very lightweight: a protothread does not require its own stack. Rather, all protothreads run on the same stack and context switching is done by stack rewinding. This is advantageous in memory constrained systems, where a stack for a thread might use a large part of the available memory. A protothread only requires only two bytes of memory per protothread. Moreover, protothreads are implemented in pure C and do not require any machine-specific assembler code.
A protothread runs within a single C function and cannot span over other functions. A protothread may call normal C functions, but cannot block inside a called function. Blocking inside nested function calls is instead made by spawning a separate protothread for each potentially blocking function. The advantage of this approach is that blocking is explicit: the programmer knows exactly which functions that may block that which functions that are not able block.
Protothreads are similar to asymmetric co-routines. The main difference is that co-routines uses a separate stack for each co-routine, whereas protothreads are stackless. The most similar mechanism to protothreads are Python generators. These are also stackless constructs, but have a different purpose. Protothreads provides blocking contexts inside a C function, whereas Python generators provide multiple exit points from a generator function.
Local variables
Because protothreads do not save the stack context across a blocking call, local variables are not preserved when the protothread blocks. This means that local variables should be used with utmost care - if in doubt, do not use local variables inside a protothread!
Scheduling
A protothread is driven by repeated calls to the function in which the protothread is running. Each time the function is called, the protothread will run until it blocks or exits. Thus the scheduling of protothreads is done by the application that uses protothreads.
Implementation
Protothreads are implemented using local continuations. A local continuation represents the current state of execution at a particular place in the program, but does not provide any call history or local variables. A local continuation can be set in a specific function to capture the state of the function. After a local continuation has been set can be resumed in order to restore the state of the function at the point where the local continuation was set.
Local continuations can be implemented in a variety of ways:
- by using machine specific assembler code,
- by using standard C constructs, or
- by using compiler extensions.
The first way works by saving and restoring the processor state, except for stack pointers, and requires between 16 and 32 bytes of memory per protothread. The exact amount of memory required depends on the architecture.
The standard C implementation requires only two bytes of state per protothread and utilizes the C switch() statement in a non-obvious way that is similar to Duff's device and heavily inspired from Simon Tatham's Coroutines in C. This implementation does, however, impose a slight restriction to the code that uses protothreads in that the code cannot use switch() statements itself.
Certain compilers has C extensions that can be used to implement protothreads. GCC supports label pointers that can be used for this purpose. With this implementation, protothreads require 4 bytes of RAM per protothread.
Examples:
- A reliable communication protocol
- Delays - text scrolling on an LCD panel
- A code lock
- The bounded buffer with protothread semaphores
- A radio driver written both with protothreads and events
A reliable communication protocol
The example shows two protothreads, one for the sender and one for the receiver.
#include "pt.h"
PT_THREAD(sender(struct pt *pt))
{
PT_BEGIN(pt);
do {
send_packet();
/* Wait until an ackowledgement has been received, or until the
timer expires. If the timer expires, we should send the packet
again. */
timer_set(&timer, TIMEOUT);
PT_WAIT_UNTIL(pt, acknowledgment_received() ||
timer_expired(&timer));
} while(timer_expired(&timer));
PT_END(pt);
}
PT_THREAD(receiver(struct pt *pt))
{
PT_BEGIN(pt);
/* Wait until a packet has been received, and send an
acknowledgment. */
PT_WAIT_UNTIL(pt, packet_received());
send_acknowledgement();
PT_END(pt);
}
Delays - text scrolling on an LCD panel
#include "pt.h"
#include "timer.h"
struct state {
char *text;
char *scrollptr;
struct pt pt;
struct timer timer;
};
PT_THREAD(display_text(struct state *s))
{
PT_BEGIN(&s->pt);
/* If the text is shorter than the display size, show it right
away. */
if(strlen(s->text) <= LCD_SIZE) {
lcd_display_text(s->text);
} else {
/* If the text is longer than the display, we should scroll in the
text from the right with a delay of one second per scroll
step. We do this in a for() loop, where the loop variable is
the pointer to the first character to be displayed. */
for(s->scrollptr = s->text;
strlen(s->scrollptr) > LCD_SIZE;
++s->scrollptr) {
lcd_display_text(s->scrollptr);
/* Wait for one second. */
timer_set(&s->timer, ONE_SECOND);
PT_WAIT_UNTIL(&s->pt, timer_expired(&s->timer));
}
}
PT_END(&s->pt);
}
A code lock
The code uses external functions for timers and for checking the keyboard. These functions are included in the full source code for this example, which is included in the downloadable tarball.
/*
* This is the code that has to be entered.
*/
static const char code[4] = {'1', '4', '2', '3'};
/*
* Declaration of the protothread function implementing the code lock
* logic. The protothread function is declared using the PT_THREAD()
* macro. The function is declared with the "static" keyword since it
* is local to this file. The name of the function is codelock_thread
* and it takes one argument, pt, of the type struct pt.
*
*/
static
PT_THREAD(codelock_thread(struct pt *pt))
{
/* This is a local variable that holds the number of keys that have
* been pressed. Note that it is declared with the "static" keyword
* to make sure that the variable is *not* allocated on the stack.
*/
static int keys;
/*
* Declare the beginning of the protothread.
*/
PT_BEGIN(pt);
/*
* We'll let the protothread loop until the protothread is
* expliticly exited with PT_EXIT().
*/
while(1) {
/*
* We'll be reading key presses until we get the right amount of
* correct keys.
*/
for(keys = 0; keys < sizeof(code); ++keys) {
/*
* If we haven't gotten any keypresses, we'll simply wait for one.
*/
if(keys == 0) {
/*
* The PT_WAIT_UNTIL() function will block until the condition
* key_pressed() is true.
*/
PT_WAIT_UNTIL(pt, key_pressed());
} else {
/*
* If the "key" variable was larger than zero, we have already
* gotten at least one correct key press. If so, we'll not
* only wait for the next key, but we'll also set a timer that
* expires in one second. This gives the person pressing the
* keys one second to press the next key in the code.
*/
timer_set(&codelock_timer, 1000);
/*
* The following statement shows how complex blocking
* conditions can be easily expressed with protothreads and
* the PT_WAIT_UNTIL() function.
*/
PT_WAIT_UNTIL(pt, key_pressed() || timer_expired(&codelock_timer));
/*
* If the timer expired, we should break out of the for() loop
* and start reading keys from the beginning of the while(1)
* loop instead.
*/
if(timer_expired(&codelock_timer)) {
printf("Code lock timer expired.\n");
/*
* Break out from the for() loop and start from the
* beginning of the while(1) loop.
*/
break;
}
}
/*
* Check if the pressed key was correct.
*/
if(key != code[keys]) {
printf("Incorrect key '%c' found\n", key);
/*
* Break out of the for() loop since the key was incorrect.
*/
break;
} else {
printf("Correct key '%c' found\n", key);
}
}
/*
* Check if we have gotten all keys.
*/
if(keys == sizeof(code)) {
printf("Correct code entered, waiting for 500 ms before unlocking.\n");
/*
* Ok, we got the correct code. But to make sure that the code
* was not just a fluke of luck by an intruder, but the correct
* code entered by a person that knows the correct code, we'll
* wait for half a second before opening the lock. If another
* key is pressed during this time, we'll assume that it was a
* fluke of luck that the correct code was entered the first
* time.
*/
timer_set(&codelock_timer, 500);
PT_WAIT_UNTIL(pt, key_pressed() || timer_expired(&codelock_timer));
/*
* If we continued from the PT_WAIT_UNTIL() statement without
* the timer expired, we don't open the lock.
*/
if(!timer_expired(&codelock_timer)) {
printf("Key pressed during final wait, code lock locked again.\n");
} else {
/*
* If the timer expired, we'll open the lock and exit from the
* protothread.
*/
printf("Code lock unlocked.\n");
PT_EXIT(pt);
}
}
}
/*
* Finally, we'll mark the end of the protothread.
*/
PT_END(pt);
}
The bounded buffer with protothread semaphores
#include "pt-sem.h"
#define NUM_ITEMS 32
#define BUFSIZE 8
static struct pt_sem full, empty;
static
PT_THREAD(producer(struct pt *pt))
{
static int produced;
PT_BEGIN(pt);
for(produced = 0; produced < NUM_ITEMS; ++produced) {
PT_SEM_WAIT(pt, &full);
add_to_buffer(produce_item());
PT_SEM_SIGNAL(pt, &empty);
}
PT_END(pt);
}
static
PT_THREAD(consumer(struct pt *pt))
{
static int consumed;
PT_BEGIN(pt);
for(consumed = 0; consumed < NUM_ITEMS; ++consumed) {
PT_SEM_WAIT(pt, &empty);
consume_item(get_from_buffer());
PT_SEM_SIGNAL(pt, &full);
}
PT_END(pt);
}
static
PT_THREAD(driver_thread(struct pt *pt))
{
static struct pt pt_producer, pt_consumer;
PT_BEGIN(pt);
PT_SEM_INIT(&empty, 0);
PT_SEM_INIT(&full, BUFSIZE);
PT_INIT(&pt_producer);
PT_INIT(&pt_consumer);
PT_WAIT_THREAD(pt, producer(&pt_producer) &
consumer(&pt_consumer));
PT_END(pt);
}
A radio driver written both with protothreads and events
This example shows an interrupt handler in a device driver for a TR1001 radio chip. The driver receives incoming data in bytes and constructs a frame that is covered by a CRC checksum. The driver is implemented both with protothreads and with an explicit state machine. The state machine has 11 states and is implemented using the C switch() statement. In contrast, the protothreads-based implementation does not have any explicit states.
The flow of control in the state machine-based implementation is quite hard to follow from inspection of the code, whereas the flow of control is evident in the protothreads based implementation.
Protothreads-based implementation
PT_THREAD(tr1001_rxhandler(unsigned char incoming_byte))
{
PT_YIELDING();
static unsigned char rxtmp, tmppos;
PT_BEGIN(&rxhandler_pt);
while(1) {
/* Wait until we receive the first syncronization byte. */
PT_WAIT_UNTIL(&rxhandler_pt, incoming_byte == SYNCH1);
tr1001_rxstate = RXSTATE_RECEVING;
/* Read all incoming syncronization bytes. */
PT_WAIT_WHILE(&rxhandler_pt, incoming_byte == SYNCH1);
/* We should receive the second synch byte by now, otherwise we'll
restart the protothread. */
if(incoming_byte != SYNCH2) {
PT_RESTART(&rxhandler_pt);
}
/* Reset the CRC. */
rxcrc = 0xffff;
/* Read packet header. */
for(tmppos = 0; tmppos < TR1001_HDRLEN; ++tmppos) {
/* Wait for the first byte of the packet to arrive. */
PT_YIELD(&rxhandler_pt);
/* If the incoming byte isn't a valid Manchester encoded byte,
we start again from the beinning. */
if(!me_valid(incoming_byte)) {
PT_RESTART(&rxhandler_pt);
}
rxtmp = me_decode8(incoming_byte);
/* Wait for the next byte to arrive. */
PT_YIELD(&rxhandler_pt);
if(!me_valid(incoming_byte)) {
PT_RESTART(&rxhandler_pt);
}
/* Put together the two bytes into a single Manchester decoded
byte. */
tr1001_rxbuf[tmppos] = (rxtmp << 4) | me_decode8(incoming_byte);
/* Calculate the CRC. */
rxcrc = crc16_add(tr1001_rxbuf[tmppos], rxcrc);
}
/* Since we've got the header, we can grab the length from it. */
tr1001_rxlen = ((((struct tr1001_hdr *)tr1001_rxbuf)->len[0] << 8) +
((struct tr1001_hdr *)tr1001_rxbuf)->len[1]);
/* If the length is longer than we can handle, we'll start from
the beginning. */
if(tmppos + tr1001_rxlen > sizeof(tr1001_rxbuf)) {
PT_RESTART(&rxhandler_pt);
}
/* Read packet data. */
for(tmppos = 6; tmppos < tr1001_rxlen + TR1001_HDRLEN; ++tmppos) {
PT_YIELD(&rxhandler_pt);
if(!me_valid(incoming_byte)) {
PT_RESTART(&rxhandler_pt);
}
rxtmp = me_decode8(incoming_byte);
PT_YIELD(&rxhandler_pt);
if(!me_valid(incoming_byte)) {
PT_RESTART(&rxhandler_pt);
}
tr1001_rxbuf[tmppos] = (rxtmp << 4) | me_decode8(incoming_byte);
rxcrc = crc16_add(tr1001_rxbuf[tmppos], rxcrc);
}
/* Read the frame CRC. */
for(tmppos = 0; tmppos < 4; ++tmppos) {
PT_YIELD(&rxhandler_pt);
if(!me_valid(incoming_byte)) {
PT_RESTART(&rxhandler_pt);
}
rxcrctmp = (rxcrctmp << 4) | me_decode8(incoming_byte);
}
if(rxcrctmp == rxcrc) {
/* A full packet has been received and the CRC checks out. We'll
request the driver to take care of the incoming data. */
tr1001_drv_request_poll();
/* We'll set the receive state flag to signal that a full frame
is present in the buffer, and we'll wait until the buffer has
been taken care of. */
tr1001_rxstate = RXSTATE_FULL;
PT_WAIT_UNTIL(&rxhandler_pt, tr1001_rxstate != RXSTATE_FULL);
}
}
PT_END(&rxhandler_pt);
}
Event-based implementation
/* No bytes read, waiting for synch byte. */
#define RXSTATE_READY 0
/* Second start byte read, waiting for header. */
#define RXSTATE_START 1
/* Reading packet header, first Manchester encoded byte. */
#define RXSTATE_HEADER1 2
/* Reading packet header, second Manchester encoded byte. */
#define RXSTATE_HEADER2 3
/* Reading packet data, first Manchester encoded byte. */
#define RXSTATE_DATA1 4
/* Reading packet data, second Manchester encoded byte. */
#define RXSTATE_DATA2 5
/* Receiving CRC16 */
#define RXSTATE_CRC1 6
#define RXSTATE_CRC2 7
#define RXSTATE_CRC3 8
#define RXSTATE_CRC4 9
/* A full packet has been received. */
#define RXSTATE_FULL 10
void
tr1001_rxhandler(unsigned char c)
{
switch(tr1001_rxstate) {
case RXSTATE_READY:
if(c == SYNCH1) {
tr1001_rxstate = RXSTATE_START;
rxpos = 0;
}
break;
case RXSTATE_START:
if(c == SYNCH1) {
tr1001_rxstate = RXSTATE_START;
} else if(c == SYNCH2) {
tr1001_rxstate = RXSTATE_HEADER1;
rxcrc = 0xffff;
} else {
tr1001_rxstate = RXSTATE_READY;
}
break;
case RXSTATE_HEADER1:
if(me_valid(c)) {
tr1001_rxbuf[rxpos] = me_decode8(c);
tr1001_rxstate = RXSTATE_HEADER2;
} else {
tr1001_rxstate = RXSTATE_ERROR;
}
break;
case RXSTATE_HEADER2:
if(me_valid(c)) {
tr1001_rxbuf[rxpos] = (tr1001_rxbuf[rxpos] << 4) |
me_decode8(c);
rxcrc = crc16_add(tr1001_rxbuf[rxpos], rxcrc);
++rxpos;
if(rxpos >= TR1001_HDRLEN) {
tr1001_rxlen = ((((struct tr1001_hdr *)tr1001_rxbuf)->len[0] << 8) +
((struct tr1001_hdr *)tr1001_rxbuf)->len[1]);
if(rxpos + tr1001_rxlen == sizeof(tr1001_rxbuf)) {
tr1001_rxstate = RXSTATE_DATA1;
}
} else {
tr1001_rxstate = RXSTATE_HEADER1;
}
} else {
tr1001_rxstate = RXSTATE_READY;
}
break;
case RXSTATE_DATA1:
if(me_valid(c)) {
tr1001_rxbuf[rxpos] = me_decode8(c);
tr1001_rxstate = RXSTATE_DATA2;
} else {
tr1001_rxstate = RXSTATE_READY;
}
break;
case RXSTATE_DATA2:
if(me_valid(c)) {
tr1001_rxbuf[rxpos] = (tr1001_rxbuf[rxpos] << 4) |
me_decode8(c);
rxcrc = crc16_add(tr1001_rxbuf[rxpos], rxcrc);
++rxpos;
if(rxpos == tr1001_rxlen + TR1001_HDRLEN) {
tr1001_rxstate = RXSTATE_CRC1;
} else if(rxpos > sizeof(tr1001_rxbuf)) {
tr1001_rxstate = RXSTATE_READY;
} else {
tr1001_rxstate = RXSTATE_DATA1;
}
} else {
tr1001_rxstate = RXSTATE_READY;
}
break;
case RXSTATE_CRC1:
if(me_valid(c)) {
rxcrctmp = me_decode8(c);
tr1001_rxstate = RXSTATE_CRC2;
} else {
tr1001_rxstate = RXSTATE_READY;
}
break;
case RXSTATE_CRC2:
if(me_valid(c)) {
rxcrctmp = (rxcrctmp << 4) | me_decode8(c);
tr1001_rxstate = RXSTATE_CRC3;
} else {
tr1001_rxstate = RXSTATE_READY;
}
break;
case RXSTATE_CRC3:
if(me_valid(c)) {
rxcrctmp = (rxcrctmp << 4) | me_decode8(c);
tr1001_rxstate = RXSTATE_CRC4;
} else {
tr1001_rxstate = RXSTATE_READY;
}
break;
case RXSTATE_CRC4:
if(me_valid(c)) {
rxcrctmp = (rxcrctmp << 4) | me_decode8(c);
if(rxcrctmp == rxcrc) {
tr1001_rxstate = RXSTATE_FULL;
tr1001_drv_request_poll();
} else {
tr1001_rxstate = RXSTATE_READY;
}
} else {
tr1001_rxstate = RXSTATE_READY;
}
break;
case RXSTATE_FULL:
/* Just drop the incoming byte. */
break;
default:
/* Just drop the incoming byte. */
tr1001_rxstate = RXSTATE_READY;
break;
}
source:http://www.sics.se/~adam/pt/
}