Cogs and Levers A blog full of technical stuff

State Machines

Introduction

State machines are essential in software for managing systems with multiple possible states and well-defined transitions. Often used in networking protocols, command processing, or user interfaces, state machines help ensure correct behavior by enforcing rules on how a program can transition from one state to another based on specific inputs or events.

In Rust, enums and pattern matching make it straightforward to create robust state machines. Rust’s type system enforces that only valid transitions happen, reducing errors that can arise in more loosely typed languages. In this article, we’ll explore how to design a state machine in Rust that’s both expressive and type-safe, with a concrete example of a networking protocol.

Setting Up the State Machine in Rust

The first step is to define the various states. Using Rust’s enum, we can represent each possible state within our state machine. For this example, let’s imagine we’re modeling a simple connection lifecycle for a network protocol.

Here’s our ConnectionState enum:

enum ConnectionState {
    Disconnected,
    Connecting,
    Connected,
    Error,
}

Each variant represents a specific state that our connection could be in. In a real-world application, you could add more states or include additional information within each state, but for simplicity, we’ll focus on these four.

Defining Transitions

Next, let’s define a transition function. This function will dictate the rules for how each state can move to another based on events. We’ll introduce another enum, Event, to represent the various triggers that cause state transitions:

enum Event {
    StartConnection,
    ConnectionSuccessful,
    ConnectionFailed,
    Disconnect,
}

Our transition function will take in the current state and an event, then use pattern matching to determine the next state.

impl ConnectionState {
    fn transition(self, event: Event) -> ConnectionState {
        match (self, event) {
            (ConnectionState::Disconnected, Event::StartConnection) => ConnectionState::Connecting,
            (ConnectionState::Connecting, Event::ConnectionSuccessful) => ConnectionState::Connected,
            (ConnectionState::Connecting, Event::ConnectionFailed) => ConnectionState::Error,
            (ConnectionState::Connected, Event::Disconnect) => ConnectionState::Disconnected,
            (ConnectionState::Error, Event::Disconnect) => ConnectionState::Disconnected,
            // No transition possible, remain in the current state
            (state, _) => state,
        }
    }
}

This function defines the valid state transitions:

  • If we’re Disconnected and receive a StartConnection event, we transition to Connecting.
  • If we’re Connecting and successfully connect, we move to Connected.
  • If a connection attempt fails, we transition to Error.
  • If we’re Connected or in an Error state and receive a Disconnect event, we return to Disconnected.

Any invalid state-event pair defaults to remaining in the current state.

Implementing Transitions and Handling Events

To make the state machine operate, let’s add a Connection struct that holds the current state and handles the transitions based on incoming events.

struct Connection {
    state: ConnectionState,
}

impl Connection {
    fn new() -> Self {
        Connection {
            state: ConnectionState::Disconnected,
        }
    }

    fn handle_event(&mut self, event: Event) {
        self.state = self.state.transition(event);
    }
}

Now, we can initialize a connection and handle events:

fn main() {
    let mut connection = Connection::new();

    connection.handle_event(Event::StartConnection);
    println!("Current state: {:?}", connection.state); // Should be Connecting

    connection.handle_event(Event::ConnectionSuccessful);
    println!("Current state: {:?}", connection.state); // Should be Connected

    connection.handle_event(Event::Disconnect);
    println!("Current state: {:?}", connection.state); // Should be Disconnected
}

With this setup, we have a fully functional state machine that moves through a predictable set of states based on events. Rust’s pattern matching and type-checking ensure that only valid transitions are possible.

Other Usage

While our connection example is simple, state machines are invaluable for more complex flows, like command processing in a CLI or a network protocol. Imagine a scenario where we have commands that can only run under certain conditions.

Let’s say we have a simple command processing machine that recognizes two commands: Init and Process. The machine can only start processing after initialization. Here’s what the implementation might look like:

enum CommandState {
    Idle,
    Initialized,
    Processing,
}

enum CommandEvent {
    Initialize,
    StartProcessing,
    FinishProcessing,
}

impl CommandState {
    fn transition(self, event: CommandEvent) -> CommandState {
        match (self, event) {
            (CommandState::Idle, CommandEvent::Initialize) => CommandState::Initialized,
            (CommandState::Initialized, CommandEvent::StartProcessing) => CommandState::Processing,
            (CommandState::Processing, CommandEvent::FinishProcessing) => CommandState::Initialized,
            (state, _) => state, // Remain in the current state if transition is invalid
        }
    }
}

With the same transition approach, we could build an interface to handle user commands, enforcing the correct order for initializing and processing. This could be extended to handle error states or additional command flows as needed.

Advantages of Using Rust for State Machines

Rust’s enums and pattern matching provide an efficient, type-safe way to create state machines. The Rust compiler helps prevent invalid transitions, as each match pattern must account for all possible states and events. Additionally:

  • Ownership and Lifetimes: Rust’s strict ownership model ensures that state transitions do not create unexpected side effects.
  • Pattern Matching: Pattern matching allows concise and readable code, making state transitions easy to follow.
  • Enums with Data: Rust enums can hold additional data for each state, providing more flexibility in complex state machines.

Rust’s approach to handling state machines is both expressive and ensures that your code remains safe and predictable. This makes Rust particularly suited for applications that require strict state management, such as networking or command-processing applications.

Conclusion

State machines are a powerful tool for managing structured transitions between states. Rust’s enums and pattern matching make implementing these machines straightforward, with added safety and performance benefits. By taking advantage of Rust’s type system, we can create state machines that are both readable and resistant to invalid transitions.

Reader Writer Locking

Introduction

The Reader-Writer problem is a classic synchronization problem that explores how multiple threads access shared resources when some only need to read the data, while others need to write (or modify) it.

In this problem:

  • Readers can access the resource simultaneously, as they only need to view the data.
  • Writers require exclusive access because they modify the data, and having multiple writers or a writer and a reader simultaneously could lead to data inconsistencies.

In Rust, this problem is a great way to explore RwLock (read-write lock), which allows us to grant multiple readers access to the data but restricts it to a single writer at a time.

Implementing

Here’s a step-by-step guide to implementing a simple version of this problem in Rust.

  1. Set up a shared resource: We’ll use an integer counter that both readers and writers will access.
  2. Create multiple readers and writers: Readers will print the current value, while writers will increment the value.
  3. Synchronize access: Using RwLock, we’ll ensure readers can access the counter simultaneously but block writers when they’re active.

Setting Up Shared State

To manage shared access to the counter, we use Arc<RwLock<T>>. Arc allows multiple threads to own the same data, and RwLock ensures that we can have either multiple readers or a single writer at any time.

Here’s the initial setup:

use std::sync::{Arc, RwLock};
use std::thread;
use std::time::Duration;

fn main() {
    // Shared counter, initially 0, wrapped in RwLock and Arc for thread-safe access
    let counter = Arc::new(RwLock::new(0));

    // Vector to hold all reader and writer threads
    let mut handles = vec![];

Creating Reader Threads

Readers will read the counter’s value and print it. Since they only need to view the data, they’ll acquire a read lock on the RwLock.

Here’s how a reader thread might look:

    // create 5 reader threads
    for i in 0..5 {
        let counter = Arc::clone(&counter);
        let handle = thread::spawn(move || {
            loop {
                // acquire a read lock
                let read_lock = counter.read().unwrap();
                
                println!("Reader {} sees counter: {}", i, *read_lock);
                
                // simulate work
                thread::sleep(Duration::from_millis(100)); 
            }
        });
        handles.push(handle);
    }

Each reader:

  • Clones the Arc so it has its own reference to the shared counter.
  • Acquires a read lock with counter.read(), which allows multiple readers to access it simultaneously.
  • Prints the counter value and then waits briefly, simulating reading work.

Creating Writer Threads

Writers need exclusive access, as they modify the data. Only one writer can have a write lock on the RwLock at a time.

Here’s how we set up a writer thread:

    // create 2 writer threads
    for i in 0..2 {
        let counter = Arc::clone(&counter);
        let handle = thread::spawn(move || {
            loop {
                // acquire a write lock
                let mut write_lock = counter.write().unwrap();
                *write_lock += 1;
                println!("Writer {} increments counter to: {}", i, *write_lock);
                thread::sleep(Duration::from_millis(150)); // Simulate work
            }
        });
        handles.push(handle);
    }

Each writer:

  • Clones the Arc to access the shared counter.
  • Acquires a write lock with counter.write(). When a writer holds this lock, no other readers or writers can access the data.
  • Increments the counter and waits, simulating writing work.

Joining the Threads

Finally, we join the threads so the main program waits for all threads to finish. Since our loops are infinite for demonstration purposes, you might add a termination condition or handle to stop the threads gracefully.

    // wait for all threads to finish (they won't in this infinite example)
    for handle in handles {
        handle.join().unwrap();
    }
}

Complete Code

Here’s the complete code breakdown for this problem:

use std::sync::{Arc, RwLock};
use std::thread;
use std::time::Duration;

fn main() {
    let counter = Arc::new(RwLock::new(0));
    let mut handles = vec![];

    // Create reader threads
    for i in 0..5 {
        let counter = Arc::clone(&counter);
        let handle = thread::spawn(move || {
            loop {
                let read_lock = counter.read().unwrap();
                println!("Reader {} sees counter: {}", i, *read_lock);
                thread::sleep(Duration::from_millis(100));
            }
        });
        handles.push(handle);
    }

    // Create writer threads
    for i in 0..2 {
        let counter = Arc::clone(&counter);
        let handle = thread::spawn(move || {
            loop {
                let mut write_lock = counter.write().unwrap();
                *write_lock += 1;
                println!("Writer {} increments counter to: {}", i, *write_lock);
                thread::sleep(Duration::from_millis(150));
            }
        });
        handles.push(handle);
    }

    for handle in handles {
        handle.join().unwrap();
    }
}

Key Components

  • Arc<RwLock<T>>: Arc provides shared ownership, and RwLock provides a mechanism for either multiple readers or a single writer.
  • counter.read() and counter.write(): RwLock’s .read() grants a shared read lock, and .write() grants an exclusive write lock. While the write lock is held, no other threads can acquire a read or write lock.
  • Concurrency Pattern: This setup ensures that multiple readers can operate simultaneously without blocking each other. However, when a writer needs access, it waits until all readers finish, and once it starts, it blocks other readers and writers.

Conclusion

The Reader-Writer problem is an excellent way to understand Rust’s concurrency features, especially RwLock. By structuring access in this way, we allow multiple readers or a single writer, which models real-world scenarios like database systems where reads are frequent but writes require careful, exclusive access.

Banker's Algorithm

Introduction

The Banker’s Algorithm is a classic algorithm used in operating systems to manage resource allocation and avoid deadlock, especially when dealing with multiple processes competing for limited resources. This problem provides an opportunity to work with data structures and logic that ensure safe, deadlock-free allocation.

In this implementation, we’ll use Rust to simulate the Banker’s Algorithm. Here’s what we’ll cover:

  • Introduction to the Banker’s Algorithm: Understanding the problem and algorithm.
  • Setting Up the System State: Define resources, allocation, maximum requirements, and available resources.
  • Implementing the Safety Check: Ensure that allocations leave the system in a safe state.
  • Requesting and Releasing Resources: Manage resources safely to prevent deadlock.

Banker’s Algorithm

The Banker’s Algorithm operates in a system where each process can request and release resources multiple times. The algorithm maintains a “safe state” by only granting resource requests if they don’t lead to a deadlock. This is done by simulating allocations and checking if the system can still fulfill all processes’ maximum demands without running out of resources.

Key components in the Banker’s Algorithm:

  • Available: The total number of each type of resource available in the system.
  • Maximum: The maximum demand of each process for each resource.
  • Allocation: The amount of each resource currently allocated to each process.
  • Need: The remaining resources each process needs to fulfill its maximum demand, calculated as Need = Maximum - Allocation.

A system is considered in a “safe state” if there exists an order in which all processes can finish without deadlock. The Banker’s Algorithm uses this condition to determine if a resource request can be granted.

Implementation

We can now break this algorithm down and present it using rust.

Setting Up the System State

Let’s start by defining the structures to represent the system’s resources, maximum requirements, current allocation, and needs.

#[derive(Debug)]
struct System {
    available: Vec<i32>,
    maximum: Vec<Vec<i32>>,
    allocation: Vec<Vec<i32>>,
    need: Vec<Vec<i32>>,
}

impl System {
    fn new(available: Vec<i32>, maximum: Vec<Vec<i32>>, allocation: Vec<Vec<i32>>) -> Self {
        let need = maximum.iter()
            .zip(&allocation)
            .map(|(max, alloc)| max.iter().zip(alloc).map(|(m, a)| m - a).collect())
            .collect();
        
        System {
            available,
            maximum,
            allocation,
            need,
        }
    }
}

In this structure:

  • available represents the system’s total available resources for each resource type.
  • maximum is a matrix where each row represents a process, and each column represents the maximum number of each resource type the process might request.
  • allocation is a matrix indicating the currently allocated resources to each process.
  • need is derived from maximum - allocation and represents each process’s remaining resource requirements.

Need breakdown

Taking the following piece of code, we can do a pen-and-paper walkthrough:

let need = maximum.iter()
    .zip(&allocation)
    .map(|(max, alloc)| max.iter().zip(alloc).map(|(m, a)| m - a).collect())
    .collect();

Suppose:

  • maximum = [[7, 5, 3], [3, 2, 2], [9, 0, 2]]
  • allocation = [[0, 1, 0], [2, 0, 0], [3, 0, 2]]

Using the above code:

  1. maximum.iter().zip(&allocation) will produce pairs:

    • ([7, 5, 3], [0, 1, 0])
    • ([3, 2, 2], [2, 0, 0])
    • ([9, 0, 2], [3, 0, 2])
  2. For each pair, the inner map and collect will compute need:

    • For [7, 5, 3] and [0, 1, 0]: [7 - 0, 5 - 1, 3 - 0] = [7, 4, 3]
    • For [3, 2, 2] and [2, 0, 0]: [3 - 2, 2 - 0, 2 - 0] = [1, 2, 2]
    • For [9, 0, 2] and [3, 0, 2]: [9 - 3, 0 - 0, 2 - 2] = [6, 0, 0]
  3. The outer collect gathers these rows, producing:

    • need = [[7, 4, 3], [1, 2, 2], [6, 0, 0]]

So, need is the remaining resource requirements for each process. This line of code efficiently computes it by iterating and performing calculations on corresponding elements in maximum and allocation.

Implementing the Safety Check

The safety check function will ensure that, after a hypothetical resource allocation, the system remains in a safe state.

Here’s the function to check if the system is in a safe state:

impl System {
    fn is_safe(&self) -> bool {
        let mut work = self.available.clone();
        let mut finish = vec![false; self.need.len()];
        
        loop {
            let mut progress = false;
            for (i, (f, n)) in finish.iter_mut().zip(&self.need).enumerate() {
                if !*f && n.iter().zip(&work).all(|(need, avail)| *need <= *avail) {
                    work.iter_mut().zip(&self.allocation[i]).for_each(|(w, &alloc)| *w += alloc);
                    *f = true;
                    progress = true;
                }
            }
            if !progress {
                break;
            }
        }
        
        finish.iter().all(|&f| f)
    }
}

Explanation:

  • Work Vector: work represents the available resources at each step.
  • Finish Vector: finish keeps track of whether each process can complete with the current work allocation.
  • We loop through each process, and if the process’s need can be satisfied by work, we simulate finishing the process by adding its allocated resources back to work.
  • This continues until no further progress can be made. If all processes are marked finish, the system is in a safe state.

Requesting Resources

The request_resources function simulates a process requesting resources. The function will:

  1. Check if the request is within the need of the process.
  2. Temporarily allocate the requested resources and check if the system remains in a safe state.
  3. If the system is safe, the request is granted; otherwise, it is denied.
impl System {
    fn request_resources(&mut self, process_id: usize, request: Vec<i32>) -> bool {
        if request.iter().zip(&self.need[process_id]).any(|(req, need)| *req > *need) {
            println!("Error: Process requested more than its need.");
            return false;
        }

        if request.iter().zip(&self.available).any(|(req, avail)| *req > *avail) {
            println!("Error: Process requested more than available resources.");
            return false;
        }

        // Pretend to allocate resources
        for i in 0..request.len() {
            self.available[i] -= request[i];
            self.allocation[process_id][i] += request[i];
            self.need[process_id][i] -= request[i];
        }

        // Check if the system is safe
        let safe = self.is_safe();

        if safe {
            println!("Request granted for process {}", process_id);
        } else {
            // Roll back if not safe
            for i in 0..request.len() {
                self.available[i] += request[i];
                self.allocation[process_id][i] -= request[i];
                self.need[process_id][i] += request[i];
            }
            println!("Request denied for process {}: Unsafe state.", process_id);
        }

        safe
    }
}

Explanation:

  • The function checks if the request exceeds the need or available resources.
  • If the request can be granted, it temporarily allocates the resources, then calls is_safe to check if the new state is safe.
  • If the system remains in a safe state, the request is granted; otherwise, it rolls back the allocation.

Releasing Resources

Processes can release resources they no longer need. This function adds the released resources back to available and reduces the process’s allocation.

impl System {
    fn release_resources(&mut self, process_id: usize, release: Vec<i32>) {
        for i in 0..release.len() {
            self.available[i] += release[i];
            self.allocation[process_id][i] -= release[i];
            self.need[process_id][i] += release[i];
        }
        println!("Process {} released resources: {:?}", process_id, release);
    }
}

Example Usage

Here’s how you might set up and use the system:

fn main() {
    let available = vec![10, 5, 7];
    let maximum = vec![
        vec![7, 5, 3],
        vec![3, 2, 2],
        vec![9, 0, 2],
        vec![2, 2, 2],
    ];
    let allocation = vec![
        vec![0, 1, 0],
        vec![2, 0, 0],
        vec![3, 0, 2],
        vec![2, 1, 1],
    ];

    let mut system = System::new(available, maximum, allocation);

    println!("Initial system state: {:?}", system);

    // Process 1 requests resources
    system.request_resources(1, vec![1, 0, 2]);

    // Process 2 releases resources
    system.release_resources(2, vec![1, 0, 0]);

    // Check the system state
    println!("Final system state: {:?}", system);
}

This setup demonstrates the core of the Banker’s Algorithm: managing safe resource allocation in a multi-process environment. By using Rust’s safety guarantees, we’ve built a resource manager that can prevent deadlock.

Going Multithreaded

The Banker’s Algorithm, as traditionally described, is often presented in a sequential way to focus on the resource-allocation logic. However, implementing a multi-threaded version makes it more realistic and challenging, as you can simulate processes concurrently requesting and releasing resources.

Let’s extend this code to add a multi-threaded component. Here’s what we’ll do:

  • Simulate Processes as Threads: Each process will run in its own thread, randomly making requests for resources or releasing them.
  • Synchronize Access: Since multiple threads will access shared data (i.e., available, maximum, allocation, and need), we’ll need to use Arc and Mutex to make the data accessible and safe across threads.

Refactor the System Structure for Thread Safety

To allow multiple threads to safely access and modify the shared System data, we’ll use Arc<Mutex<System>> to wrap the entire System. This approach ensures that only one thread can modify the system’s state at any time.

Let’s update our code to add some dependencies:

use std::sync::{Arc, Mutex};
use std::thread;
use std::time::Duration;
use rand::Rng;

Now, we’ll use Arc<Mutex<System>> to safely share this System across multiple threads.

Implement Multi-Threaded Processes

Each process (thread) will:

  1. Attempt to request resources at random intervals.
  2. Either succeed or get denied based on the system’s safe state.
  3. Occasionally release resources to simulate task completion.

Here’s how we might set this up:

fn main() {
    let available = vec![10, 5, 7];
    let maximum = vec![
        vec![7, 5, 3],
        vec![3, 2, 2],
        vec![9, 0, 2],
        vec![2, 2, 2],
    ];
    let allocation = vec![
        vec![0, 1, 0],
        vec![2, 0, 0],
        vec![3, 0, 2],
        vec![2, 1, 1],
    ];

    // Wrap the system in Arc<Mutex> for safe shared access
    let system = Arc::new(Mutex::new(System::new(available, maximum, allocation)));

    // Create threads for each process
    let mut handles = vec![];
    for process_id in 0..4 {
        let system = Arc::clone(&system);
        let handle = thread::spawn(move || {
            let mut rng = rand::thread_rng();
            loop {
                // Generate a random request with non-negative values within a reasonable range
                let request = vec![
                    rng.gen_range(0..=3),
                    rng.gen_range(0..=2),
                    rng.gen_range(0..=2),
                ];

                // Attempt to request resources
                {
                    let mut sys = system.lock().unwrap();
                    println!("Process {} requesting {:?}", process_id, request);
                    if sys.request_resources(process_id, request.clone()) {
                        println!("Process {} granted {:?}", process_id, request);
                    } else {
                        println!("Process {} denied {:?}", process_id, request);
                    }
                }

                thread::sleep(Duration::from_secs(1));

                // Occasionally release resources, ensuring non-negative values
                let release = vec![
                    rng.gen_range(0..=2),
                    rng.gen_range(0..=1),
                    rng.gen_range(0..=1),
                ];

                {
                    let mut sys = system.lock().unwrap();
                    sys.release_resources(process_id, release.clone());
                    println!("Process {} released {:?}", process_id, release);
                }

                thread::sleep(Duration::from_secs(2));
            }
        });
        handles.push(handle);
    }

    // Wait for all threads to finish (they won't in this infinite example)
    for handle in handles {
        handle.join().unwrap();
    }
}

Explanation of the Multi-Threaded Implementation

  1. Random Resource Requests and Releases:

    • Each process generates a random request vector simulating the resources it wants to acquire.
    • It then locks the system to call request_resources, either granting or denying the request based on the system’s safety check.
    • After a short wait, each process may release some resources (also randomly determined).
  2. Concurrency Management with Arc<Mutex<System>>:

    • Each process clones the Arc<Mutex<System>> handle, ensuring shared access to the system.
    • Before each request_resources or release_resources operation, each process locks the Mutex on System. This ensures that only one thread modifies the system at any given time, preventing race conditions.
  3. Thread Loop:

    • Each thread runs in an infinite loop, continuously requesting and releasing resources. This simulates real-world processes that may continuously request and release resources over time.

Conclusion

The Banker’s Algorithm is a powerful way to manage resources safely, and Rust’s type system and memory safety features make it well-suited for implementing such algorithms. By simulating requests, releases, and safety checks, you can ensure the system remains deadlock-free. This algorithm is especially useful in operating systems, databases, and network management scenarios.

By adding multi-threading to the Banker’s Algorithm, we’ve made the simulation more realistic, reflecting how processes in a real system might concurrently request and release resources. Rust’s Arc and Mutex constructs ensure safe shared access, aligning with Rust’s memory safety guarantees.

This multi-threaded implementation of the Banker’s Algorithm provides:

  • Deadlock Avoidance: Requests are only granted if they leave the system in a safe state.
  • Resource Allocation Simulation: Processes continually request and release resources, emulating a dynamic resource allocation environment.

A Star Pathfinding Algorithm

Introduction

Pathfinding is essential in many applications, from game development to logistics. A* (A-star) is one of the most efficient algorithms for finding the shortest path in grid-based environments, such as navigating a maze or finding the quickest route on a map. Combining the strengths of Dijkstra’s algorithm and greedy best-first search, A* is fast yet accurate, making it a popular choice for pathfinding tasks.

In this guide, we’ll walk through implementing A* in Rust, using Rust’s unique ownership model to handle node exploration and backtracking. We’ll also take advantage of data structures like priority queues and hash maps to keep the algorithm efficient and Rust-safe.

Core Concepts and Data Structures

Priority Queue

A* relies on a priority queue to process nodes in the order of their path cost. In Rust, the BinaryHeap structure provides a simple and efficient way to prioritize nodes based on their cost to reach the destination.

Hash Map

We’ll use a HashMap to keep track of the best cost found for each node. This allows for quick updates and retrievals as we explore different paths in the grid.

Grid Representation

For simplicity, we’ll represent our grid as a 2D array, with each element holding a Node. A Node will store information about its coordinates and path costs, including the estimated cost to reach the target.

Ownership and Memory Management in Rust

Rust’s ownership model helps us manage memory safely while exploring nodes. For example, as we create references to neighboring nodes and update paths, Rust’s borrowing rules ensure we don’t accidentally create dangling pointers or memory leaks.

For shared ownership, we can use Rc (reference counting) and RefCell to allow multiple nodes to reference each other safely, making Rust both safe and efficient.

Implementation

Setup

First, let’s define the main components of our grid and the A* algorithm.

use std::collections::{BinaryHeap, HashMap};
use std::cmp::Ordering;

// Define a simple Node structure
#[derive(Copy, Clone, Eq, PartialEq)]
struct Node {
    position: (i32, i32),
    g_cost: i32,  // Cost from start to current node
    h_cost: i32,  // Heuristic cost from current to target node
}

impl Node {
    fn f_cost(&self) -> i32 {
        self.g_cost + self.h_cost
    }
}

// Implement Ord and PartialOrd to use Node in a priority queue
impl Ord for Node {
    fn cmp(&self, other: &Self) -> Ordering {
        other.f_cost().cmp(&self.f_cost()) // Reverse for min-heap behavior
    }
}

impl PartialOrd for Node {
    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
        Some(self.cmp(other))
    }
}

Initialize the Grid and Heuristic Function

A simple heuristic we’ll use here is the Manhattan distance, suitable for grids where movement is limited to horizontal and vertical steps.

fn heuristic(start: (i32, i32), target: (i32, i32)) -> i32 {
    (start.0 - target.0).abs() + (start.1 - target.1).abs()
}

Pathfinding Function

Here’s the core of the A* algorithm. We’ll initialize the grid, process nodes, and backtrack to find the shortest path.

fn astar(start: (i32, i32), target: (i32, i32), grid_size: (i32, i32)) -> Option<Vec<(i32, i32)>> {
    let mut open_set = BinaryHeap::new();
    let mut came_from = HashMap::new();
    let mut g_costs = HashMap::new();

    open_set.push(Node { position: start, g_cost: 0, h_cost: heuristic(start, target) });
    g_costs.insert(start, 0);

    while let Some(current) = open_set.pop() {
        if current.position == target {
            return Some(reconstruct_path(came_from, current.position));
        }

        for neighbor in get_neighbors(current.position, grid_size) {
            let tentative_g_cost = g_costs[&current.position] + 1;

            if tentative_g_cost < *g_costs.get(&neighbor).unwrap_or(&i32::MAX) {
                came_from.insert(neighbor, current.position);
                g_costs.insert(neighbor, tentative_g_cost);
                open_set.push(Node {
                    position: neighbor,
                    g_cost: tentative_g_cost,
                    h_cost: heuristic(neighbor, target),
                });
            }
        }
    }
    None
}

fn get_neighbors(position: (i32, i32), grid_size: (i32, i32)) -> Vec<(i32, i32)> {
    let (x, y) = position;
    let mut neighbors = Vec::new();
    for (dx, dy) in &[(0, 1), (1, 0), (0, -1), (-1, 0)] {
        let nx = x + dx;
        let ny = y + dy;
        if nx >= 0 && nx < grid_size.0 && ny >= 0 && ny < grid_size.1 {
            neighbors.push((nx, ny));
        }
    }
    neighbors
}

fn reconstruct_path(came_from: HashMap<(i32, i32), (i32, i32)>, mut current: (i32, i32)) -> Vec<(i32, i32)> {
    let mut path = Vec::new();
    while let Some(&prev) = came_from.get(&current) {
        path.push(current);
        current = prev;
    }
    path.reverse();
    path
}

Explanation

  1. Initialization: We add the start node to the open_set with an initial g_cost of 0.
  2. Exploration: We pop nodes from open_set, starting with the lowest f_cost. If a neighbor has a lower g_cost (cost so far) than previously recorded, we update its cost and re-insert it into open_set.
  3. Backtracking: Once we reach the target, we backtrack through the came_from map to reconstruct the path.

Running the A* Algorithm

Finally, let’s run the algorithm to see the path from a start to target position:

fn main() {
    let start = (0, 0);
    let target = (4, 5);
    let grid_size = (10, 10);

    match astar(start, target, grid_size) {
        Some(path) => {
            println!("Path found: {:?}", path);
        }
        None => {
            println!("No path found");
        }
    }
}

Output

This will display the sequence of nodes from start to target, giving us the shortest path using the A* algorithm.

We can restructure these functions to also add visual representations on screen.

use std::collections::{BinaryHeap, HashMap};
use std::{cmp::Ordering, thread, time::Duration};

// Define symbols for visualization
const EMPTY: char = '.';
const START: char = 'S';
const TARGET: char = 'T';
const OPEN: char = '+';
const CLOSED: char = '#';
const PATH: char = '*';

#[derive(Copy, Clone, Eq, PartialEq)]
struct Node {
    position: (i32, i32),
    g_cost: i32,
    h_cost: i32,
}

impl Node {
    fn f_cost(&self) -> i32 {
        self.g_cost + self.h_cost
    }
}

impl Ord for Node {
    fn cmp(&self, other: &Self) -> Ordering {
        other.f_cost().cmp(&self.f_cost())
    }
}

impl PartialOrd for Node {
    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
        Some(self.cmp(other))
    }
}

fn heuristic(start: (i32, i32), target: (i32, i32)) -> i32 {
    (start.0 - target.0).abs() + (start.1 - target.1).abs()
}

fn astar(start: (i32, i32), target: (i32, i32), grid_size: (i32, i32)) -> Option<Vec<(i32, i32)>> {
    let mut open_set = BinaryHeap::new();
    let mut came_from = HashMap::new();
    let mut g_costs = HashMap::new();
    let mut closed_set = Vec::new();

    open_set.push(Node { position: start, g_cost: 0, h_cost: heuristic(start, target) });
    g_costs.insert(start, 0);

    // Initial grid setup
    let mut grid = vec![vec![EMPTY; grid_size.1 as usize]; grid_size.0 as usize];
    grid[start.0 as usize][start.1 as usize] = START;
    grid[target.0 as usize][target.1 as usize] = TARGET;

    // Display progress with each iteration
    while let Some(current) = open_set.pop() {
        if current.position == target {
            return Some(reconstruct_path(came_from, current.position, &mut grid, start, target));
        }
        
        closed_set.push(current.position);
        grid[current.position.0 as usize][current.position.1 as usize] = CLOSED;
        display_grid(&grid);
        thread::sleep(Duration::from_millis(100)); // Slow down for visualization

        for neighbor in get_neighbors(current.position, grid_size) {
            let tentative_g_cost = g_costs[&current.position] + 1;
            if tentative_g_cost < *g_costs.get(&neighbor).unwrap_or(&i32::MAX) {
                came_from.insert(neighbor, current.position);
                g_costs.insert(neighbor, tentative_g_cost);

                grid[neighbor.0 as usize][neighbor.1 as usize] = OPEN;
                open_set.push(Node {
                    position: neighbor,
                    g_cost: tentative_g_cost,
                    h_cost: heuristic(neighbor, target),
                });
            }
        }
    }
    None
}

fn get_neighbors(position: (i32, i32), grid_size: (i32, i32)) -> Vec<(i32, i32)> {
    let (x, y) = position;
    let mut neighbors = Vec::new();
    for (dx, dy) in &[(0, 1), (1, 0), (0, -1), (-1, 0)] {
        let nx = x + dx;
        let ny = y + dy;
        if nx >= 0 && nx < grid_size.0 && ny >= 0 && ny < grid_size.1 {
            neighbors.push((nx, ny));
        }
    }
    neighbors
}

fn reconstruct_path(
    came_from: HashMap<(i32, i32), (i32, i32)>, 
    mut current: (i32, i32), 
    grid: &mut Vec<Vec<char>>, 
    start: (i32, i32), 
    target: (i32, i32)
) -> Vec<(i32, i32)> {
    let mut path = Vec::new();
    while let Some(&prev) = came_from.get(&current) {
        path.push(current);
        current = prev;
    }
    path.reverse();

    for &(x, y) in &path {
        grid[x as usize][y as usize] = PATH;
    }
    grid[start.0 as usize][start.1 as usize] = START;
    grid[target.0 as usize][target.1 as usize] = TARGET;
    display_grid(&grid);

    path
}

fn display_grid(grid: &Vec<Vec<char>>) {
    println!("\x1B[2J\x1B[1;1H"); // Clear the screen
    for row in grid {
        for cell in row {
            print!("{} ", cell);
        }
        println!();
    }
}

fn main() {
    let start = (0, 0);
    let target = (7, 9);
    let grid_size = (10, 10);

    match astar(start, target, grid_size) {
        Some(path) => {
            println!("Path found: {:?}", path);
        }
        None => {
            println!("No path found");
        }
    }
}

Explanation

  • display_grid: This function clears the screen and then prints the current state of the grid to visualize the progress.
  • Thread Sleep: A short delay is added to slow down the iteration for visual effect. You can adjust Duration::from_millis(100) to control the speed.
  • Symbols:
    • . (OPEN): Nodes being considered.
    • # (CLOSED): Nodes already explored.
    • * (PATH): Final path from start to target.

After running this code, you should see the path being solved from one point to another:

S * * * + . . . . . 
# # # * + . . . . . 
# # # * * + . . . . 
+ # # # * + . . . . 
+ # # # * + . . . . 
. + # # * T . . . . 
. . + + + . . . . . 
. . . . . . . . . . 
. . . . . . . . . . 
. . . . . . . . . . 
Path found: [(0, 1), (0, 2), (0, 3), (1, 3), (2, 3), (2, 4), (3, 4), (4, 4), (5, 4), (5, 5)]

Conclusion

The A* algorithm can be optimized by tuning the heuristic function. In Rust, using BinaryHeap and HashMap helps manage the exploration process efficiently, and Rust’s ownership model enforces safe memory practices.

A* in Rust is an excellent example of how the language’s unique features can be leveraged to implement classic algorithms effectively. Rust’s focus on memory safety and efficient abstractions makes it a strong candidate for implementing pathfinding and other performance-sensitive tasks.

The Producer-Consumer Pattern

Introduction

Concurrency can feel tricky to manage, especially in a language like Rust where memory safety is strictly enforced. The Producer-Consumer pattern, however, provides a clean way to manage workloads between multiple threads, ensuring data is processed as it’s produced. In this post, we’ll explore this pattern by building a prime number checker in Rust. We’ll use a producer to generate candidate numbers and several consumers to check each candidate’s primality.

By the end of this post, you’ll understand how to manage shared data between threads, use an atomic flag for graceful termination, and leverage Rust’s concurrency tools.

The Producer-Consumer Pattern

In the Producer-Consumer pattern, one or more producers generate data and place it in a shared buffer. Consumers then take this data from the buffer and process it. This pattern is great for tasks that can be distributed, as it allows producers and consumers to run concurrently without overwhelming the system.

In our example, the producer is a main thread that generates numbers for prime-checking, while the consumers are threads that take these numbers and determine if they are prime. Here’s what we’ll build:

  1. A producer (main thread) to generate candidate numbers.
  2. A shared buffer where the candidates are stored.
  3. Consumers (worker threads) that retrieve numbers from the buffer and check their primality.
  4. An atomic stop flag to catch SIGINT (Ctrl+C) and cleanly stop the program.

Code Overview

Let’s break down the code section by section.

Setting Up Shared State

First, we need to set up the shared data structures for our threads. Rust provides a few useful tools for this: Arc, Mutex, and AtomicBool.

  • Arc<T> (Atomic Reference Counting) allows us to share ownership of data between threads.
  • Mutex<T> protects shared data, ensuring only one thread can access it at a time.
  • AtomicBool is a thread-safe boolean that can be modified atomically, which we’ll use to signal our threads when it’s time to stop.
use std::sync::{Arc, Mutex, atomic::{AtomicBool, Ordering}};
use std::thread;
use std::time::Duration;
use ctrlc;

fn main() {
    // small primes for checking divisibility
    let primes = vec![2, 3, 5, 7, 11, 13, 17]; 
    
    // shared buffer with Mutex and Arc
    let candidates_mutex = Arc::new(Mutex::new(vec![])); 
    
    // this is our kill switch
    let stop_flag = Arc::new(AtomicBool::new(false));
    
    // here's where we're currently up to in the candidate check
    let mut current = 20;

Here, candidates_mutex is an Arc<Mutex<Vec<u32>>> — an atomic reference-counted mutex around a vector of candidate numbers. By wrapping our vector with Mutex, we ensure that only one thread can modify the buffer at any time, preventing race conditions.

The stop_flag is an AtomicBool, which will allow us to signal when it’s time for all threads to stop processing. We’ll look at how to use this flag in the section on handling SIGINT.

Stopping Gracefully

When running a multi-threaded application, it’s essential to handle termination gracefully. Here, we’ll use the ctrlc crate to catch the SIGINT signal (triggered by Ctrl+C) and set stop_flag to true to signal all threads to stop.

    let stop_flag_clone = stop_flag.clone();
    ctrlc::set_handler(move || {
        stop_flag_clone.store(true, Ordering::SeqCst);
        println!("Received SIGINT, stopping . . . ");
    }).expect("Error setting Ctrl-C handler");

The store function sets the value of stop_flag to true, and we use Ordering::SeqCst to ensure all threads see the change immediately. This will stop all consumers from further processing once they check the flag.

Creating the Consumer Threads

Now that we have a stop flag and a shared buffer, we can create our consumer threads. Each consumer thread will:

  1. Check the stop_flag.
  2. Attempt to retrieve a candidate number from candidates_mutex.
  3. Check if the candidate is prime.

Here’s the code:

let handles: Vec<_> = (0..3)
    .map(|_| {
        // clone all of our concurrent structures for each thread
        let candidates_mutex = Arc::clone(&candidates_mutex);
        let primes = primes.clone();
        let stop_flag_clone = stop_flag.clone();

        thread::spawn(move || {
            // we make sure that we're still "running"
            while !stop_flag_clone.load(Ordering::SeqCst) {
                
                let candidate = {
                    // lock the mutex to get a candidate number
                    let mut candidates = candidates_mutex.lock().unwrap();
                    candidates.pop()
                };

                // check that a candidate was available
                if let Some(num) = candidate {
                    
                    // perform a primality check (basic division check for illustration)
                    let is_prime = primes.iter().all(|&p| num % p != 0 || num == p);

                    if is_prime {
                        println!("{} is prime", num);
                    } else {
                        println!("{} is not prime", num);
                    }

                } else {
                    // If no candidates are available, wait a moment before retrying
                    thread::sleep(Duration::from_millis(10));
                }
            }
        })
    })
    .collect();

Explanation

  • Each consumer thread runs a loop, checking the stop_flag using the .load(Ordering::SeqCst) function on AtomicBool. This function reads the current value of stop_flag, and with Ordering::SeqCst, we ensure that all threads see consistent updates.
  • Inside the loop, the thread locks the candidates_mutex to safely access candidates.
  • If a candidate is available, the thread checks its primality using modulus operations. If no candidate is available, it sleeps briefly before trying again, minimizing CPU usage.

Why .clone()?

You’ll notice at the start of the thread’s execution, we setup a group of clones to work with.

If you’re new to Rust, you might wonder why we need to clone our Arc references when passing them to threads. In many languages, you can freely share references between threads without much consideration. Rust, however, has strict rules about data ownership and thread safety, which is why cloning Arcs becomes essential.

Rust’s Ownership Model and Shared Data

Rust enforces a strong ownership model to prevent data races, requiring that only one thread can “own” any piece of data at a time. This rule ensures that data cannot be modified simultaneously by multiple threads, which could lead to unpredictable behavior and bugs.

However, our prime-checker example needs multiple threads to access shared data (the list of candidates), which would normally violate Rust’s ownership rules. To make this possible, we use Arc and Mutex:

  1. Arc<T>: Atomic Reference Counting for Shared Ownership

Arc stands for Atomic Reference Counted, and it enables multiple threads to safely share ownership of the same data. Unlike a regular reference, Arc keeps a reference count, incremented each time you “clone” it. When the reference count drops to zero, Rust automatically deallocates the data.

Each clone of an Arc doesn’t copy the data itself; it only adds a new reference, allowing multiple threads to safely access the same data.

  1. Mutex<T>: Ensuring Safe Access

Wrapping the shared data (in this case, our vector of candidates) in a Mutex allows threads to lock the data for exclusive access, preventing simultaneous modifications. The combination of Arc and Mutex gives us shared, safe, and controlled access to the data across threads.

Why Clone and Not Move?

You might wonder why we don’t simply “move” the Arc into each thread. Moving the Arc would transfer ownership to a single thread, leaving it inaccessible to other threads. Cloning allows us to create additional references to the same Arc, giving each thread access without compromising Rust’s ownership and thread-safety guarantees.

In essence, cloning an Arc doesn’t duplicate the data; it just creates another reference to it. This approach allows multiple threads to access shared data while still adhering to Rust’s safety guarantees.

By using Arc for shared ownership and Mutex for safe, exclusive access, we can implement the Producer-Consumer pattern in Rust without breaking any ownership or thread-safety rules.

Producing Prime Candidates

Now, let’s look at the producer, which is responsible for generating numbers and adding them to the shared buffer. Here’s how it works:

// Main thread generating numbers
loop {
    // if we get the stop flag, we stop producing candidates
    if stop_flag.load(Ordering::SeqCst) {
        println!("Main thread stopping...");
        break;
    }

    {
        // acquire a lock on the candidates vector as we need to push some new candidates on
        let mut candidates = candidates_mutex.lock().unwrap();

        // 4 potential candidates per groups of 10
        candidates.push(current + 1);
        candidates.push(current + 3);
        candidates.push(current + 7);
        candidates.push(current + 9);

        println!("Added candidates, buffer size: {}", candidates.len());
    }

    // move on to the next group of 10
    current += 10;

    // slow down for illustration
    thread::sleep(Duration::from_millis(1)); 
}

The producer runs in a loop, checking stop_flag with .load(Ordering::SeqCst) to know when to stop. It then locks candidates_mutex, adds numbers to the buffer, and increments current to generate new candidates.

This part ties back to the Producer-Consumer pattern: while the producer keeps generating numbers, the consumers independently pull them from the shared buffer for prime-checking.

Finishing up

Finally, we ensure a clean exit by calling join on each consumer thread. This function blocks until the thread completes, ensuring all threads finish gracefully.

    // Wait for all threads to finish
    for handle in handles {
        handle.join().unwrap();
    }

    println!("All threads exited. Goodbye!");
}

Conclusion

This project is a great way to explore concurrency in Rust. By applying the Producer-Consumer pattern, we can efficiently manage workloads across threads while ensuring safety with Arc, Mutex, and AtomicBool.

Key takeaways:

  • AtomicBool and .load(Ordering::SeqCst): Allow threads to check for a termination signal in a consistent manner.
  • Mutex and Arc for Shared Data: Ensures that multiple threads can safely read and write to the same data.
  • Producer-Consumer Pattern: A practical way to distribute workloads and ensure efficient resource utilization.
  • By catching SIGINT, we also made our program resilient to unexpected terminations, ensuring that threads exit cleanly.