Testing Simple Train Control System Using Its Formal Description
Written ByVitaliy Zakaznikov
Share It
In our previous post, Decoding Formal Description of a Simple Train Control System, we examined how to interpret a formal model defined through a Kripke structure for a simple train control system—the Ceiling Speed Monitoring Controller (CSMC). Now, we’re taking the next step by using this model to build a powerful test suite to verify an AI-generated Python implementation of the controller.
With AI tools like ChatGPT increasingly involved in code generation, rigorously testing this code is essential. So, we’ll develop the test suite and see how good the AI-generated code really is—buckle up, and let’s start testing!
Reference implementation
Before we can start testing, we need a concrete implementation of the system. Taking a modern approach, we’ll use an AI tool, ChatGPT, to provide us with a reference implementation in Python.
We’ll give the ChatGPT 4o model access to the original article, Decoding Formal Description of a Simple Train Control System, to provide context. In addition, we will restate the transition relation, R, which defines the system’s behavior as follows:
Refer to the Transition relation R section in the previous blog if the transition relation is unclear.
Next, we provided ChatGPT with the following prompt to generate a Python implementation:
"Can you write a Python program that will simulate the behavior of the system described by this transition relation? Make it a CLI utility that allows the user to set initial V_MRSP and guards, then prompts the user for V_est, shows the state of the system, and exits cleanly when CTRL-C is pressed."
Below is the Python program generated by ChatGPT, which we will now test.
defmain(): # Register the signal handler for clean exit on CTRL-C signal.signal(signal.SIGINT, signal_handler)
# Prompt the user for initial setup values V_MRSP = float(input("Enter the initial maximum restricted speed (V_MRSP): ")) gebi1_threshold = float(input("Enter the emergency brake guard threshold for V_MRSP > 110 (default is 15): ") or15) gebi2_threshold = float(input("Enter the emergency brake guard threshold for V_MRSP <= 110 (default is 7.5): ") or7.5)
# Initialize the system with these parameters system = TrainControlSystem(V_MRSP, gebi1_threshold, gebi2_threshold)
print("\nTrain Control System initialized.") print("Press CTRL-C to exit.\n")
# Continuously prompt for V_est and update the system state whileTrue: try: V_est = float(input("Enter the estimated speed (V_est): ")) system.set_V_est(V_est) except ValueError: print("Invalid input. Please enter a numerical value for V_est.")
if __name__ == "__main__": main()
When you run the program, it will prompt you to enter V_MRSP and two thresholds used to determine when emergency braking should be applied. Afterward, it will ask for the estimated current speed of the train, V_est, display the system’s current state, and then wait for the next V_est reading. You can stop the program at any time by pressing CTRL-C.
1
$ python train_control_system.py
1 2 3 4 5 6 7 8 9 10
Enter the initial maximum restricted speed (V_MRSP): 100 Enter the emergency brake guard threshold for V_MRSP > 110 (default is 15): 15 Enter the emergency brake guard threshold for V_MRSP <= 110 (default is 7.5): 7.5
Train Control System initialized. Press CTRL-C to exit.
Enter the estimated speed (V_est): 80 State: NS, V_est: 80.0, V_MRSP: 100.0, W: 0, EB: 0 Enter the estimated speed (V_est):
Building test input sequences
With the reference implementation in hand, we’re ready to verify that it meets the system’s behavioral requirements. Fortunately, since we have a formal definition of the system, these requirements are as precise as they can be.
What should we use from the formal definition to build our test sequences? The answer is straightforward: we need the set of atomic propositions AP to create input equivalence class partitions (IECP).
To construct input equivalence class partitions, we only need to focus on the propositions directly related to the inputs Vest and VMRSP. Therefore, our input set becomes:
Let’s construct the initial table of input equivalence class partitions (IECP). This table includes every combination of each atomic proposition related to the inputs being either true or false, representing the Cartesian product of all possible input states.
Now, we can construct a table explicitly listing all 25=32 possible classes. From this set, we can then filter out combinations that have logical conflicts and select specific values for Vest and VMRSP that satisfy each class. Alternatively, we could visualize these combinations graphically, as shown below.
For VMRSP>110, we can use the following values:
VMRSP=120,Vest={0,50,123,132,140}
For VMRSP≤110, we can use the following values:
VMRSP=100,Vest={0,50,105,113,120}
These values are chosen to cover each region of interest. Since the estimated speed cannot simultaneously fall into multiple regions, most of the 25=32 combinations are invalid. Only 10 classes remain as valid.
Another way to work this out
If we assign each atomic proposition a short pi name, we get the following definitions:
p0: VMRSP>110 — The MRSP is greater than 110.
p1: Vest=0 — The estimated speed of the train is zero.
p2: Vest>VMRSP — The estimated speed exceeds the maximum restricted speed (MRSP).
p3: Vest>VMRSP+7.5 — The estimated speed exceeds the MRSP by more than 7.5.
p4: Vest>VMRSP+15 — The estimated speed exceeds the MRSP by more than 15.
From the table that includes all 32 classes, only the following valid combinations remain:
Class
p0
p1
p2
p3
p4
Explanation
1
p0
p1
¬p2
¬p3
¬p4
VMRSP>110, Vest=0
2
p0
¬p1
¬p2
¬p3
¬p4
VMRSP>110, no thresholds exceeded
3
p0
¬p1
p2
¬p3
¬p4
VMRSP>110, Vest>VMRSP without exceeding other thresholds
4
p0
¬p1
p2
p3
¬p4
VMRSP>110, Vest>VMRSP exceeding 7.5 but not 15
5
p0
¬p1
p2
p3
p4
VMRSP>110, exceeding all thresholds
6
¬p0
p1
¬p2
¬p3
¬p4
VMRSP<=110, Vest=0
7
¬p0
¬p1
¬p2
¬p3
¬p4
VMRSP<=110, no thresholds exceeded
8
¬p0
¬p1
p2
¬p3
¬p4
VMRSP<=110, Vest>VMRSP without exceeding other thresholds
9
¬p0
¬p1
p2
p3
¬p4
VMRSP<=110, Vest>VMRSP exceeding 7.5 but not 15
10
¬p0
¬p1
p2
p3
p4
VMRSP<=110, exceeding all thresholds
Then, you simply select values for each input that satisfy the conditions of each input equivalence class partition.
Coding up user actions
With our test input values defined, we can start writing the test code to automate all possible user actions, including:
Starting and stopping the program
Setting V_MRSP and threshold values
Entering estimated speed values
Verifying the normal status state
Verifying the warning status state
Verifying the emergency braking (intervention) status state
Since we’re working with a command-line interface (CLI) program, we’ll use the testflows.connect module to control it with the send and expect methods. Additionally, we’ll define a State class to encapsulate all elements of the controller’s state into a single object. This class will be crucial when we implement the controller’s behavior model, which will serve as a test oracle to provide the expected state for any combination of controller behavior.
Here is the implementation of all actions, including the State class, in Python:
@TestStep(Given) defstart_program(self): """Start the train controm system program.""" with Shell() as bash: with bash("python train_control_system.py", asynchronous=True) as controller: self.context.app = controller.app try: yield finally: with By("stopping the program with CTRL-C"): self.context.app.child.send("\03") self.context.app.child.expect("Exiting program.") self.context.app.child.expect("\n", escape=False) bash.expect(bash.prompt)
@TestStep(When) defenter_speed(self, v_est): """Enter the estimated speed and then read and return current system state.""" self.context.app.child.send(str(v_est)) self.context.app.child.expect(str(v_est)) self.context.app.child.expect("Enter the estimated speed \(V_est\)\:") # State: NS, V_est: 60.0, V_MRSP: 100.0, W: 0, EB: 0 pattern = re.compile( r"State: (?P<State>\w+), " r"V_est: (?P<V_est>[\d.]+), " r"V_MRSP: (?P<V_MRSP>[\d.]+), " r"W: (?P<W>\d+), " r"EB: (?P<EB>\d+)" ) return State(**pattern.match(self.context.app.child.before.strip()).groupdict())
@TestStep(Given) defenter_vmrsp_and_thresholds(self, v_mrsp, g_ebi1_threshold=15, g_ebi2_threshold=7.5): """Enter V_MRSP and guard 1 and 2 thresholds.""" app = self.context.app
app.child.expect("Enter the initial maximum restricted speed \(V_MRSP\)\:") app.child.send(str(v_mrsp)) app.child.expect("\n", escape=False)
app.child.expect( "Enter the emergency brake guard threshold for V_MRSP > 110 \(default is 15\)" ) app.child.send(str(g_ebi1_threshold)) app.child.expect("\n", escape=False)
app.child.expect( "Enter the emergency brake guard threshold for V_MRSP <= 110 \(default is 7.5\)" ) app.child.send(str(g_ebi2_threshold)) app.child.expect("\n", escape=False)
app.child.expect("Press CTRL-C to exit.") app.child.expect("\n", escape=False) app.child.expect("Enter the estimated speed \(V_est\)\:")
@TestStep(Then) defexpect_normal_status(self, state, W, EB): """Expect normal status.""" assert state == "NS", f"expected state NS but got {state}" assert W == 0, f"expected W=0 but got {W}" assert EB == 0, f"expected EB=0 but got {EB}"
@TestStep(Then) defexpect_warning_status(self, state, W, EB): """Expect warning status.""" assert state == "WS", f"expected state WS but got {state}" assert W == 1, f"expected W=1 but got {W}" assert EB == 0, f"expected EB=0 but got {EB}"
@TestStep(Then) defexpect_emergency_brake_status(self, state, W, EB): """Expect emergency brake status.""" assert state == "IS", f"expected state IS but got {state}" assert W == 1, f"expected W=1 but got {W}" assert EB == 1, f"expected EB=1 but got {EB}"
The behavior model
To check the numerous combinations of inputs—potentially hundreds—we need a behavior model that can accurately compute the expected state of the system for each case.
This behavior model is derived from the system’s transition relation R. Here it is again for easy reference, allowing you to compare it directly with the code implementation below:
defexpect_normal_status(self, behavior): """Expect normal status.""" current_state = behavior[-1] prev_state = behavior[-2] if prev_state.State == "NS": if current_state.V_est <= current_state.V_MRSP: return expect_normal_status elif prev_state.State == "WS": if current_state.V_est <= current_state.V_MRSP: return expect_normal_status elif prev_state.State == "IS": if current_state.V_est == 0: return expect_normal_status
defexpect_warning_status(self, behavior): """Expect warning status.""" current_state = behavior[-1] prev_state = behavior[-2] if prev_state.State in ("NS", "WS"): if current_state.V_est > current_state.V_MRSP: ifnot (self.g_ebi1(current_state) or self.g_ebi2(current_state)): return expect_warning_status
defexpect_emergency_brake_status(self, behavior): """Expect emergency brake status.""" current_state = behavior[-1] prev_state = behavior[-2] if prev_state.State in"NS": if self.g_ebi1(current_state) or self.g_ebi2(current_state): return expect_emergency_brake_status elif prev_state.State == "WS": if self.g_ebi1(current_state) or self.g_ebi2(current_state): return expect_emergency_brake_status elif prev_state.State == "IS": if current_state.V_est > 0: return expect_emergency_brake_status
defexpect(self, behavior): """Return expected behavior.""" return ( self.expect_normal_status(behavior) or self.expect_warning_status(behavior) or self.expect_emergency_brake_status(behavior) )
To use this model, we’ll define a new check_result action that takes a specified behavior and uses the model to determine the expected state of the controller.
1 2 3 4 5 6 7 8
@TestStep(Then) defcheck_result(self, behavior, model): """Check the result of the train control system.""" current_state = behavior[-1] expect = model.expect(behavior) debug(f"{current_state},expect={expect.__name__ if expect elseNone}") expect(state=current_state.State, W=current_state.W, EB=current_state.EB)
Writing the test to check all equivalence classes
With the inputs selected to cover each equivalence class, we can now write the actual test. Since the system is stateful, we need to account for transitions between states as the system progresses. Given that the system has only three internal states, we can apply the Pigeonhole principle, just as we did in the Determining the minimum number of calls section when testing the memory function in the previous article.
The test itself will consist of two parts: the TestOutline, which defines the overall structure of the test, and individual checks for specific combinations of VMRSP, threshold values, and sequences of estimated speed readings.
@TestOutline(Scenario) defcheck_combination(self, v_mrsp, g_ebi1_threshold, g_ebi2_threshold, speed_sequence): """Check specific combination of the controller behavior."""
model = Model(g_ebi1_threshold=g_ebi1_threshold, g_ebi2_threshold=g_ebi2_threshold) behavior = []
with Given("I start the train control system program"): start_program()
with And("I set V_MRSP and guard thresholds"): enter_vmrsp_and_thresholds( v_mrsp=v_mrsp, g_ebi1_threshold=g_ebi1_threshold, g_ebi2_threshold=g_ebi2_threshold, ) # add the initial state behavior.append(State(State="NS", V_est=0, V_MRSP=v_mrsp, W=0, EB=0))
for i, v_est inenumerate(speed_sequence): with When(f"step {i}: speed is {v_est}"): state = enter_speed(v_est=v_est) behavior.append(state) check_result(behavior=behavior, model=model)
The outline above will be used in the check_all_combinations test, which will generate all the necessary combinations to test each value of VMRSP, threshold settings, and estimated speed values corresponding to each selected VMRSP.
@TestStep(Given) defstart_program(self): """Start the train controm system program.""" with Shell() as bash: with bash("python train_control_system.py", asynchronous=True) as controller: self.context.app = controller.app try: yield finally: with By("stopping the program with CTRL-C"): self.context.app.child.send("\03") self.context.app.child.expect("Exiting program.") self.context.app.child.expect("\n", escape=False) bash.expect(bash.prompt)
@TestStep(When) defenter_speed(self, v_est): """Enter the estimated speed and then read and return current system state.""" self.context.app.child.send(str(v_est)) self.context.app.child.expect(str(v_est)) self.context.app.child.expect("Enter the estimated speed \(V_est\)\:") # State: NS, V_est: 60.0, V_MRSP: 100.0, W: 0, EB: 0 pattern = re.compile( r"State: (?P<State>\w+), " r"V_est: (?P<V_est>[\d.]+), " r"V_MRSP: (?P<V_MRSP>[\d.]+), " r"W: (?P<W>\d+), " r"EB: (?P<EB>\d+)" ) return State(**pattern.match(self.context.app.child.before.strip()).groupdict())
@TestStep(Given) defenter_vmrsp_and_thresholds(self, v_mrsp, g_ebi1_threshold=15, g_ebi2_threshold=7.5): """Enter V_MRSP and guard 1 and 2 thresholds.""" app = self.context.app
app.child.expect("Enter the initial maximum restricted speed \(V_MRSP\)\:") app.child.send(str(v_mrsp)) app.child.expect("\n", escape=False)
app.child.expect( "Enter the emergency brake guard threshold for V_MRSP > 110 \(default is 15\)" ) app.child.send(str(g_ebi1_threshold)) app.child.expect("\n", escape=False)
app.child.expect( "Enter the emergency brake guard threshold for V_MRSP <= 110 \(default is 7.5\)" ) app.child.send(str(g_ebi2_threshold)) app.child.expect("\n", escape=False)
app.child.expect("Press CTRL-C to exit.") app.child.expect("\n", escape=False) app.child.expect("Enter the estimated speed \(V_est\)\:")
@TestStep(Then) defexpect_normal_status(self, state, W, EB): """Expect normal status.""" assert state == "NS", f"expected state NS but got {state}" assert W == 0, f"expected W=0 but got {W}" assert EB == 0, f"expected EB=0 but got {EB}"
@TestStep(Then) defexpect_warning_status(self, state, W, EB): """Expect warning status.""" assert state == "WS", f"expected state WS but got {state}" assert W == 1, f"expected W=1 but got {W}" assert EB == 0, f"expected EB=0 but got {EB}"
@TestStep(Then) defexpect_emergency_brake_status(self, state, W, EB): """Expect emergency brake status.""" assert state == "IS", f"expected state IS but got {state}" assert W == 1, f"expected W=1 but got {W}" assert EB == 1, f"expected EB=1 but got {EB}"
@TestStep(Then) defcheck_result(self, behavior, model): """Check the result of the train control system.""" current_state = behavior[-1] expect = model.expect(behavior) debug(f"{current_state},expect={expect.__name__ if expect elseNone}") expect(state=current_state.State, W=current_state.W, EB=current_state.EB)
defexpect_normal_status(self, behavior): """Expect normal status.""" current_state = behavior[-1] prev_state = behavior[-2] if prev_state.State == "NS": if current_state.V_est <= current_state.V_MRSP: return expect_normal_status elif prev_state.State == "WS": if current_state.V_est <= current_state.V_MRSP: return expect_normal_status elif prev_state.State == "IS": if current_state.V_est == 0: return expect_normal_status
defexpect_warning_status(self, behavior): """Expect warning status.""" current_state = behavior[-1] prev_state = behavior[-2] if prev_state.State in ("NS", "WS"): if current_state.V_est > current_state.V_MRSP: ifnot (self.g_ebi1(current_state) or self.g_ebi2(current_state)): return expect_warning_status
defexpect_emergency_brake_status(self, behavior): """Expect emergency brake status.""" current_state = behavior[-1] prev_state = behavior[-2] if prev_state.State in"NS": if self.g_ebi1(current_state) or self.g_ebi2(current_state): return expect_emergency_brake_status elif prev_state.State == "WS": if self.g_ebi1(current_state) or self.g_ebi2(current_state): return expect_emergency_brake_status elif prev_state.State == "IS": if current_state.V_est > 0: return expect_emergency_brake_status
defexpect(self, behavior): """Return expected behavior.""" return ( self.expect_normal_status(behavior) or self.expect_warning_status(behavior) or self.expect_emergency_brake_status(behavior) )
@TestOutline(Scenario) defcheck_combination(self, v_mrsp, g_ebi1_threshold, g_ebi2_threshold, speed_sequence): """Check specific combination of the controller behavior."""
model = Model(g_ebi1_threshold=g_ebi1_threshold, g_ebi2_threshold=g_ebi2_threshold) behavior = []
with Given("I start the train control system program"): start_program()
with And("I set V_MRSP and guard thresholds"): enter_vmrsp_and_thresholds( v_mrsp=v_mrsp, g_ebi1_threshold=g_ebi1_threshold, g_ebi2_threshold=g_ebi2_threshold, ) # add the initial state behavior.append(State(State="NS", V_est=0, V_MRSP=v_mrsp, W=0, EB=0))
for i, v_est inenumerate(speed_sequence): with When(f"step {i}: speed is {v_est}"): state = enter_speed(v_est=v_est) behavior.append(state) check_result(behavior=behavior, model=model)
for v_mrsp, (g_ebi1_threshold, g_ebi2_threshold) in product(list(v_mrsps.keys()), thresholds): speeds = v_mrsps[v_mrsp] for speed_sequence in product(speeds, repeat=3): Combination( f"#{i}: V_MRSP={v_mrsp},g_ebi1={g_ebi1_threshold},g_ebi2={g_ebi2_threshold},speeds={speed_sequence}", test=check_combination, )( v_mrsp=v_mrsp, g_ebi1_threshold=g_ebi1_threshold, g_ebi2_threshold=g_ebi2_threshold, speed_sequence=speed_sequence, ) i += 1
@TestModule defregression(self): "Exhaustively test train control system.""" Feature(test=check_all_combinations)()
if main(): regression()
Running the test program
The moment of truth has arrived! Is ChatGPT’s implementation of the controller correct? Let’s run our comprehensive test program:
1
python3 test_train_control_system.py -o slick
First bug
And, either unfortunately or fortunately for our test program, we catch the first bug!
1 2 3 4 5 6 7 8
➤ Module regression ➤ Feature check all combinations ✔ Combination #0﹕ V_MRSP=100,g_ebi1=15,g_ebi2=7․5,speeds=(0, 0, 0) ✔ Combination #1﹕ V_MRSP=100,g_ebi1=15,g_ebi2=7․5,speeds=(0, 0, 50) ✘ Combination #2﹕ V_MRSP=100,g_ebi1=15,g_ebi2=7․5,speeds=(0, 0, 123), AssertionError ✘ Feature check all combinations, AssertionError ✘ Module regression, AssertionError AssertionError: expected state IS but got WS
The output indicates that we’ve found a case where we expected the controller to enter the emergency braking (intervention) status (IS) state, but instead, it was in the warning status (WS) state. Not good!
After reviewing the code, we identify the issue:
1 2 3 4 5 6 7 8 9
# Apply transition rules based on the current state if self.state == "NS": if self.V_est <= self.V_MRSP: self.state = "NS"# phi_0 elif self.V_est > self.V_MRSP: self.state, self.W = "WS", 1# phi_1 # BUG: this needs to be checked before self.V_est > self.V_MRSP! elif gebi1 or gebi2: self.state, self.EB = "IS", 1# phi_2
The code checks the gebi1 or gebi2 condition afterself.V_est > self.V_MRSP, which is incorrect! It should first check for gebi1 or gebi2.
Second bug
Let’s apply the fix to train_control_system.py and try again.
This time, the test program reveals that the warning signal (W) was not set when expected. Upon further investigation, we find that the controller entered the emergency braking (intervention) status state (IS) without setting the warning signal (W) to alert the driver.
It turns out the same block of code is to blame again—it had two bugs!
1 2 3 4 5 6 7 8 9 10
# Apply transition rules based on the current state if self.state == "NS": if self.V_est <= self.V_MRSP: self.state = "NS"# phi_0 elif self.V_est > self.V_MRSP: self.state, self.W = "WS", 1# phi_1 # BUG: this needs to be checked before self.V_est > self.V_MRSP! elif gebi1 or gebi2: # BUG: warning singal self.W is not set to 1 when moving to the IS state! self.state, self.EB = "IS", 1# phi_2
Fixing both bugs
After applying both fixes to the code, we finally get the following:
1 2 3 4 5 6 7 8
# Apply transition rules based on the current state if self.state == "NS": if self.V_est <= self.V_MRSP: self.state = "NS"# phi_0 elif gebi1 or gebi2: self.state, self.W, self.EB = "IS", 1, 1# phi_2 elif self.V_est > self.V_MRSP: self.state, self.W = "WS", 1# phi_1
Running the test program again, we see that it no longer fails immediately. This time, it actually completes successfully!
The test program successfully checked all 2×53=250 input combinations.
Conclusion
In this post, we created a comprehensive test based on the formal description of the Ceiling Speed Monitoring Controller (CSMC) discussed in the Decoding Formal Description of a Simple Train Control System article. By using the set of atomic propositions derived from the Kripke structure and creating input equivalence class partitions from it, our test successfully caught two critical bugs in the AI-generated reference implementation.
These bugs were subtle and would have been challenging to identify through visual code review alone. However, by employing rigorous testing techniques rooted in formal methods, we effectively identified both issues. That said, this test program does not guarantee the discovery of all possible bugs in any given implementation of the controller. The test program achieves thorough coverage only when the set of atomic propositions aligns perfectly with the model. Since we cannot always know the actual atomic propositions for a specific implementation, additional testing techniques are necessary to uncover any hidden atomic propositions that might exist in the code.
This is what makes testing both challenging and rewarding!