In this post we will see how to implement a Petri net in a processor like Arduino. Petri nets are a powerful mechanism in event theory and, especially, in systems with concurrent events.
In a previous post we saw finite state machines as a structured way to program an automaton. In a way, we can consider Petri nets as an evolution of state machines.
In a state machine only one state can be activated at each moment. State changes are made through transitions, which depend only on the current state and on a condition associated with the transition.
On the other hand, in a Petri net, instead of having states we have nodes. Each node can have a null or non-null number of marks. In the graphical representation of a Petri net, marks are usually represented as points inside the nodes.
Here are some graphical representations of simple Petri nets.
Another difference from a finite state machine is that the transitions of a Petri net can be associated with more than one input node and have more than one output node.
For a transition to be able to fire, in addition to meeting the associated condition, it must have at least one mark in each of its input nodes. In this case we say that the transition is sensitized, which means that it can fire if its associated condition is met.
When a transition fires, it removes a mark from each of the input nodes and adds a mark to each output node.
Petri nets allow a more convenient representation of certain automata, especially those that require concurrent events or branch synchronization. For example, imagine an industrial machine that needs two workstations to operate (e.g. I can’t assemble a juice bottle until I have the bottle, the cap, and the label).
Petri net example
The classic example to illustrate the operation and usefulness of a Petri net is the case of two synchronized cars. We have two tracks, on which two cars circulate. Pressing a button makes the upper track car move to the other side. Another button does the same with the lower track car.
However, for both cars to be able to move they must be synchronized. That is, both cars must have changed sides before one of them can return. It is not possible, for example, for the upper car to change stations several times while the lower one is still.
This problem admits a simple representation in the form of a Petri net, while in the case of implementing it as a state machine, the solution is comparatively more complex.
The advantage of the Petri net is evident if, instead of two cars, we use three. The Petri net remains simple, while we see that the complexity required for the equivalent state machine is skyrocketing.
This illustrates the convenience and usefulness of Petri nets compared to finite state machines, especially in the handling of automata with parallel branches and the need for event synchronization.
Implementing a Petri net in Arduino
After this brief introduction to Petri nets, we are going to see that it is not as difficult as it seems to implement a simple implementation in Arduino.
Let’s see it with a simple example, a variation of the example of the synchronized cars we mentioned earlier. Although it may be easier to visualize if you imagine, for example, that they are pieces in an assembly line.
In the example we have two branches, lower and upper, and four stations, so we have a total of eight states. The upper branch advances when it receives the character ‘A’ through the serial port and the lower one when it receives ‘B’.
To make it more interesting, the step from the first station to the last station allows both ‘A’ and ‘B’. With either of these inputs, the marks will advance simultaneously in both branches.
And to add a timer, let’s assume that in the lower station the car returns from state 6 to 5, if the upper track piece has not advanced after a period of 5 seconds.
We are going to see a very light and simple implementation of this Petri net.
enum Input
{
ForwardA = 0,
ForwardB = 1,
Unknown = 2
};
Input input;
const int numStates = 8;
uint8_t _markup[numStates];
uint8_t _prevMarkup[numStates];
// Definition of the transitions
void Transition0()
{
if (_prevMarkup[0] && _prevMarkup[4] && (input == Input::ForwardA || input == Input::ForwardB))
{
RemoveMark(0); RemoveMark(4);
AddMark(1); AddMark(5);
Serial.println("Fired T0");
printMarkup();
}
}
void Transition1()
{
if (_prevMarkup[1] && input == Input::ForwardA)
{
RemoveMark(1);
AddMark(2);
Serial.println("Fired T1");
printMarkup();
}
}
void Transition2()
{
if (_prevMarkup[2] && _prevMarkup[6] && (input == Input::ForwardA || input == Input::ForwardB))
{
RemoveMark(2); RemoveMark(6);
AddMark(3); AddMark(7);
Serial.println("Fired T2");
printMarkup();
}
}
void Transition3()
{
if (_prevMarkup[3] && input == Input::ForwardA)
{
RemoveMark(3);
AddMark(0);
Serial.println("Fired T3");
printMarkup();
}
}
void Transition4()
{
if (_prevMarkup[5] && input == Input::ForwardB)
{
RemoveMark(5);
AddMark(6);
activateTimer();
Serial.println("Fired T4");
printMarkup();
}
}
void Transition5()
{
if (_prevMarkup[7] && input == Input::ForwardB)
{
RemoveMark(7);
AddMark(4);
Serial.println("Fired T5");
printMarkup();
}
}
void Transition6()
{
if (_prevMarkup[6] && timerExpired())
{
RemoveMark(6);
AddMark(5);
Serial.println("Fired T6");
printMarkup();
}
}
void setup()
{
Serial.begin(9600);
// Initial state
for (uint8_t index = 0; index < numStates; index++)
_markup[index] = 0;
_markup[0] = 1;
_markup[4] = 1;
printMarkup(); // Show the initial state
}
void loop()
{
input = static_cast<Input>(readInput());
Update();
}
// Update the net
void Update()
{
// Copy markup to prevMarkup
memcpy(_prevMarkup, _markup, numStates * sizeof(uint8_t));
Transition0();
Transition1();
Transition2();
Transition3();
Transition4();
Transition5();
Transition6();
}
void AddMark(uint8_t index)
{
_markup[index]++;
}
void RemoveMark(uint8_t index)
{
_prevMarkup[index]--;
_markup[index]--;
}
// Read a character for the example
int readInput()
{
Input currentInput = Input::Unknown;
if (Serial.available())
{
char incomingChar = Serial.read();
switch (incomingChar)
{
case 'A': currentInput = Input::ForwardA; break;
case 'B': currentInput = Input::ForwardB; break;
default: break;
}
}
return currentInput;
}
// For example debugging
void printMarkup()
{
Serial.print(_markup[0]);
Serial.print(_markup[1]);
Serial.print(_markup[2]);
Serial.println(_markup[3]);
Serial.print(_markup[4]);
Serial.print(_markup[5]);
Serial.print(_markup[6]);
Serial.println(_markup[7]);
}
// Timer for transition 6
unsigned long previousMillis;
bool isTimerON = false;
void activateTimer()
{
isTimerON = true;
previousMillis = millis();
}
bool timerExpired()
{
if (isTimerON == false) return false;
if ((unsigned long)(millis() - previousMillis) >= 5000)
return true;
return false;
}
In this simple implementation, on the one hand, we have two vectors that store the current and previous marking state (we will see later why we need the previous one).
Most of the weight of the program is in the Transition0…5()
functions, which contain all the logic of the transitions. They check if the transition is sensitized, and if the firing condition is met. If the condition is met, and it is sensitized, it performs the appropriate actions, and updates the marking of the Petri net.
In the Setup function, we simply initialize the state of the marks to the initial position. In the loop, we receive a character through the serial port, and update the state of the Petri net with the Update function. The Update function copies the state of the machine to the previous state vector, and triggers all the transitions. The reason for copying the state of the machine is to prevent transitions that were not sensitized from being triggered during marking changes in an iteration.
For this same reason, the marks are added through the AddMark and RemoveMark functions. Both changes are made in the new marking. But in the previous marking, which is used to determine if an input is sensitized, we only remove them, we do not add them.
For the timed input, when transition 4 fires we activate a timer. Transition 6 has as a firing condition the value of the timeExpired function, which precisely returns true when the timer has expired.
Finally, we have additional functions for the example to work, such as readInput, which receives the input through the serial port, and printMarkup which shows the state of the net so that we can check the example’s operation.
This implementation is very simple, but it is suitable to illustrate that implementing a Petri net does not have to be as complex as it seems. We could greatly improve it by using vectors, structures, … but it doesn’t make much sense because it’s just what we’re going to see in the next section.
Results
If we run the previous code, we can see the result through the serial port. We can change the state by pressing the ‘A’ and ‘B’ keys.
Petri net in a library
We have seen a very simple implementation of a Petri net in Arduino, among the many possible ways to do it. But it should have been clear that it has large repetitive parts.
In this case, the code is screaming for us to use objects, which in Arduino translates into making a library.
So here is the PetriNet library, which implements a Petri net in Arduino in a comfortable and simple way, with additional optimizations. You are invited to take a look at the code.
With this library, the previous example would be solved with the following code.
#include "PetriNetLib.h"
enum Input
{
ForwardA = 0,
ForwardB = 1,
Unknown = 2
};
Input input;
PetriNet petriNet(8, 7);
void setup()
{
Serial.begin(9600);
// Definition of the example's Petri net
// Inputs and outputs of the states
static uint8_t inputs0[] = { 0, 4 };
static uint8_t outputs0[] = { 1, 5 };
static uint8_t inputs1[] = { 1 };
static uint8_t outputs1[] = { 2 };
static uint8_t inputs2[] = { 2, 6 };
static uint8_t outputs2[] = { 3, 7 };
static uint8_t inputs3[] = { 3 };
static uint8_t outputs3[] = { 0 };
static uint8_t inputs4[] = { 5 };
static uint8_t outputs4[] = { 6 };
static uint8_t inputs5[] = { 7 };
static uint8_t outputs5[] = { 4 };
// Transitions
petriNet.SetTransition(0, inputs0, 2, outputs0, 2,
[]() -> bool {return input == Input::ForwardA || input == Input::ForwardB; },
[]() {Serial.println("Fired T0"); printMarkup(); });
petriNet.SetTransition(1, inputs1, 1, outputs1, 1,
[]() -> bool {return input == Input::ForwardA; },
[]() {Serial.println("Fired T1"); printMarkup(); });
petriNet.SetTransition(2, inputs2, 2, outputs2, 2,
[]() -> bool {return input == Input::ForwardA || input == Input::ForwardB; },
[]() {Serial.println("Fired T2"); printMarkup(); });
petriNet.SetTransition(3, inputs3, 1, outputs3, 1,
[]() -> bool {return input == Input::ForwardA; },
[]() {Serial.println("Fired T3"); printMarkup(); });
petriNet.SetTransition(4, inputs4, 1, outputs4, 1,
[]() -> bool {return input == Input::ForwardB; },
[]() {Serial.println("Fired T4"); printMarkup(); activateTimer(); });
petriNet.SetTransition(5, inputs5, 1, outputs5, 1,
[]() -> bool {return input == Input::ForwardB; },
[]() {Serial.println("Fired T5"); printMarkup(); });
petriNet.SetTransition(6, outputs4, 1, inputs4, 1,
timerExpired,
[]() {Serial.println("Reseting T6"); printMarkup(); });
// Initial marking
petriNet.SetMarkup(0, 1);
petriNet.SetMarkup(4, 1);
printMarkup(); // Show the initial state
}
void loop()
{
input = static_cast<Input>(readInput());
petriNet.Update();
}
// Read a character for the example
int readInput()
{
Input currentInput = Input::Unknown;
if (Serial.available())
{
char incomingChar = Serial.read();
switch (incomingChar)
{
case 'A': currentInput = Input::ForwardA; break;
case 'B': currentInput = Input::ForwardB; break;
default: break;
}
}
return currentInput;
}
// For example debugging
void printMarkup()
{
Serial.print(petriNet.GetMarkup(0));
Serial.print(petriNet.GetMarkup(1));
Serial.print(petriNet.GetMarkup(2));
Serial.println(petriNet.GetMarkup(3));
Serial.print(petriNet.GetMarkup(4));
Serial.print(petriNet.GetMarkup(5));
Serial.print(petriNet.GetMarkup(6));
Serial.println(petriNet.GetMarkup(7));
}
// Timer for transition 6
unsigned long previousMillis;
bool isTimerON = false;
void activateTimer()
{
isTimerON = true;
previousMillis = millis();
}
bool timerExpired()
{
if (isTimerON == false) return false;
if ((unsigned long)(millis() - previousMillis) >= 5000)
return true;
return false;
}
Download the code
All the code from this post is available for download on Github.