JustToThePoint English Website Version
JustToThePoint en español
JustToThePoint in Thai

Search algorithms: BFS, DFS, A*

Introduction. Search algorithms.

A search algorithm is an algorithm, a plan, or a step-by-step procedure that solves a search problem. Suppose that you are trying to find your way around in a maze with walls or a town that has many one-way streets. There are many routes from your initial state (A) to the target destination (B), but you do not know which one to take.

It consists of a set of states, i.e., a set of all possible states or places where you can be; a start state (this is the initial state or place from where the search or path begins), and a goal state. There should be a function or a goal criterion that looks at the current state and it returns whether or not it is the goal state. Finally, the algorithm may find the solution in the form of an action sequence or path.

Depth-first search

Depth-first search (DFS) is an algorithm for traversing or searching tree or graph data structures. It starts at the root node (selecting some arbitrary node as the root node in the case of a graph) and explores as far as possible along each branch before backtracking.

from enum import Enum
import random, heapq
import copy
from rich import print
from typing import Deque

class Cell(Enum):
"""There are four types of cells: empty, blocked (wall), start, and goal. Finally, the algorithm may find the solution in the form of a path."""
    EMPTY = " "
    BLOCKED = "X"
    START = "S"
    GOAL = "G"
    PATH = "*"

def aux(c):
    """This is only an auxiliary method to print colored text to the terminal using Rich."""
    if c==Cell.PATH:
        return "[red]*[/red]"
    elif c==Cell.START:
        return "[blue]S[/blue]"
    elif c==Cell.GOAL:
        return "[blue]G[/blue]"
    elif c==Cell.BLOCKED:
        return "X"
    else:
        return " "

class Location():
    """ It represents a position or location in the maze.""" 
    def __init__(self, x, y):
        self.x = x
        self.y = y

    def __eq__(self, other):
        return self.x==other.x and self.y == other.y

    def __str__(self):
        print("(" + str(self.x) + ", " + str(self.y) + ")")

class Node():
    """ It represents a node in a search tree. It contains the actual position for the node, a pointer to its parent, the cost to reach the node (i.e., the movement cost to move from the starting point to the current node), and the heuristic value (a goal distance estimate).""" 
    def __init__(self, position, parent = None, cost = 0.0, heuristic = 0.0):
        self.position = position
        self.parent = parent
        self.cost = cost
        self.heuristic = heuristic

    def __lt__(self, other):
    """ The A* Search Algorithm picks the node or cell having the lowest f (f = g + h, g is the cost to reach the node and h is the "estimated" cost to move from the current node to the final destination) and processes that node."""
        return (self.cost + self.heuristic) < (other.cost + other.heuristic)

    def path_solution(self):
    """ It returns the full path or solution."""
        path = [self.position]
        currentNode = self
        while currentNode.parent is not None:
            currentNode = currentNode.parent
            path.append(currentNode.position)

        path.reverse()
        return path

DFS uses a stack that works on the LIFO (Last In First Out) principle.

A stack is a data structure used to store a collection of objects or elements where the addition of new items and the removal of existing items always take place at the top of the stack.

Stacks, LIFO

class Stack(object):
    def __init__(self):
        self.items = []

    def __len__(self): # It returns the number of elements in the stack.
        return len(self.items)

    def push(self, item): # It adds a new item to the top of the stack. 
        self.items.append(item)

The top of the stack is the last element in our list. However, the intricacies of our implementation are totally hidden from the user.

    def pop(self): # It removes the top element from the stack and returns the removed item.
        if not self.is_empty():
            return self.items.pop() # The Python's pop() method takes a single argument (index). If an argument is not passed to the function, it uses the default index -1. 

    def peek(self): # It returns the top element from the stack but does not remove it.
        if not self.is_empty():
            return self.items[-1]

    def is_empty(self): # It returns true when the stack is empty; otherwise, the function returns false. 
        return len(self.items) == 0

Finally, we are ready to implement the DFS algorithm and solve the searching problem.

class Maze:
    """ It implements our maze with a starting (startx, starty) and an ending point (goalx, goaly), and the desired number of rows and columns. If no file name is provided, it generates a random maze with a number of walls based on the sparseness argument."""
    def __init__ (self, startx, starty, goalx, goaly, rows = 10, columns = 10, sparseness = 0.2, fileName=None):
        self.rows = rows
        self.columns = columns
            
        if fileName is None:
            self.grid = [[Cell.EMPTY for c in range(columns)] for r in range(rows)]
            self.randomly_fill(sparseness)
        else:
            self.maze_fill(fileName)

        self.start = Location(startx, starty)
        self.goal = Location(goalx, goaly)
        self.grid[startx][starty] = Cell.START
        self.grid[goalx][goaly] = Cell.GOAL
                        
    # Out of the empty space in the grid, it fills it randomly with walls.
    def randomly_fill(self, sparseness):
        for row in range(self.rows):
            for column in range(self.columns):
                if random.uniform(0, 1.0) < sparseness:
                    self.grid[row][column] = Cell.BLOCKED

     # It creates or builds the maze from a file. This will be used for programming our version of [Pacman](https://www.justtothepoint.com/en/pacman/).
     def maze_fill(self, fileMaze):
        with open(fileMaze) as f:
            lines = f.readlines()
            lines = [line.replace(' ', '') for line in lines]

        self.rows = 36
        self.columns = 28
        self.grid = [[Cell.EMPTY for c in range(self.columns)] for r in range(self.rows)]
      
        for row, line in enumerate(lines):
            for col, character in enumerate(line):
                if row in [15, 16, 17, 18] and col in [12, 13, 14, 14]:
                # These positions correspond to the ghosts' home, so we consider them as empty cells.
                    self.grid[row][col] = Cell.EMPTY
                # These are the symbols for the nodes, paths, and portals in our pacman's maze.
                elif character in ['+', 'P', 'n', 'G', '.', '-', '|', 'p']:
                    self.grid[row][col] = Cell.EMPTY
                elif col!=self.columns:
                    self.grid[row][col] = Cell.BLOCKED

    # It returns a string representation of the maze for printing.
    def __str__(self):
        output: str = ""
        for row in self.grid:
            output += "".join([c.value for c in row]) + "\n"
        return output

    # It draws the maze with colors for better visualization.
    def draw(self):
        line = "".join(["-" for i in range(0,self.columns)])
        print(line)
        for row in self.grid:
            output = ""
            for c in row:
                output += aux(c)
            
            print(output)
        print(line)

    # This is the method that looks at position (i.e., the current state) and returns whether or not it is the goal state.
    def isGoalReached(self, position):
        return self.goal == position

    # It returns a list containing all valid directions from position.
    def validDirections(self, position):
        directions = []
        if position.x + 1 < self.rows and self.grid[position.x + 1][position.y] != Cell.BLOCKED:
            directions.append(Location(position.x + 1, position.y))
        if position.x - 1 >= 0 and self.grid[position.x - 1][position.y] != Cell.BLOCKED:
            directions.append(Location(position.x - 1, position.y))
        if position.y + 1 < self.columns and self.grid[position.x][position.y + 1] != Cell.BLOCKED:
            directions.append(Location(position.x, position.y + 1))
        if position.y - 1 >= 0 and self.grid[position.x][position.y - 1] != Cell.BLOCKED:
            directions.append(Location(position.x, position.y - 1))
        return directions

    # It implements the dfs algorithm.
    def dfs(self):
        frontier = Stack() # The frontier is a stack that is used to find the next nodes to be visited.
        frontier.push(Node(self.start)) # The starting node is labeled as discovered since it has already been visited, so it is added to the frontier.
        explored = { (self.start.x, self.start.y) } # A set is used to avoid re-traversing nodes.

        while not frontier.is_empty(): # While the frontier is not empty, there is a chance to find a solution.
            current_node = frontier.pop() # We pop the first element in the stack and set the corresponding element as our current node and position.
            current_position = current_node.position

            if self.isGoalReached(current_position): # If the current position is the goal state, we return the current node as we have already found a solution, and the current node has all the information that we need to find the solution path by backtracking through its parents or predecessors.
                return current_node

            for direction in self.validDirections(current_position): # It generates all the node's successors or valid directions from the current position.
                if (direction.x, direction.y) in explored: # If a node's successor has already been explored, we discard it as useless.
                    continue

                explored.add( (direction.x, direction.y) ) # Otherwise, it is added to the stack and set.
                frontier.push(Node(direction, current_node))

        return None # There is no solution to the maze.
    
    # It marks the solution path in the maze with breadcrumbs (aka *)
    def mark(self, path):
        for location in path:
            self.grid[location.x][location.y] = Cell.PATH
        
        self.grid[self.start.x][self.start.y] = Cell.START
        self.grid[self.goal.x][self.goal.y] = Cell.GOAL

    # It searches for the solution, clones the maze, and draws the solution in the cloned maze.
    def showSolutionDfs(self):
        solution = self.dfs()
        if solution == None:
            print("There is no solution")
        else:
            myClone = copy.deepcopy(self)
            myClone.mark(solution.path_solution())
            myClone.draw()
           
if __name__ == "__main__":
    myMaze = Maze(0, 0, 9, 9)
    myMaze.draw()
    myMaze.showSolutionDfs()

BSD Search

Breadth-first search

Breadth-first search is an algorithm for searching a tree data structure that traverses it layer-wise. It starts at the tree root or root node and explores all nodes at the present depth before moving on at the next depth level. It uses a queue to store visited nodes.

A queue is a data structure used to store a collection of objects or elements which works exactly like a real-life queue, that is, a FIFO data structure in which the element that is inserted first into the queue is the first one to be taken out. It models or enforces a first-come first-serve policy.

A queue

class Queue():
    def __init__(self):
        self.items = []

    def __len__(self): # It returns the number of elements in the queue.
        return len(self.items)

    def enqueue(self, item): # It adds a new item to the tail of the queue. The tail of the queue is the first element in our list.
        self.items.insert(0, item)

    def dequeue(self): # It removes the first element from the queue (the element at the head of the queue) and returns the removed item.
        if not self.is_empty():
            return self.items.pop()

    def is_empty(self): # It returns true when the queue is empty; otherwise, the function returns false. 
        return len(self.items) == 0

    def peek(self): # It returns the first element from the queue but does not remove it.
        if not self.is_empty():
            return self.items[-1]

    def __str__(self): # It allows us to print our queue.
        s = "{ "
        for i in range(len(self.items)):
            if i != len(self.items)-1:
                s += str(self.items[i]) + ", "
            else:
                s += str(self.items[i]) + " }"

        return s

def bfs(self): # It implements the bfs algorithm. It is a method of Maze.
        frontier = Queue() # The frontier is a queue that is used to find the next nodes to be visited.
        frontier.enqueue(Node(self.start)) # The starting node is labeled as discovered since it has already been visited, so it is added to the frontier.
        explored = { (self.start.x, self.start.y) } # A set is used to avoid re-traversing nodes.

        while not frontier.is_empty(): # While the frontier is not empty, there is a chance to find a solution.
            current_node = frontier.dequeue() # We dequeue the first element from the queue and set the corresponding element as our current node and position.
            current_position = current_node.position 

            if self.isGoalReached(current_position): # If the current position is the goal state, we return the current node as we have already found a solution, and the current node has all the information that we need to find the solution path by backtracking through its parents or predecessors.
                return current_node

            for direction in self.validDirections(current_position): # It generates all the node's successors or valid directions from the current position.
                if (direction.x, direction.y) in explored: # If a node's successor has already been explored, we discard it as useless.
                    continue

                explored.add( (direction.x, direction.y) ) # Otherwise, it is enqueued to the queue and added to the set.
                frontier.enqueue(Node(direction, current_node))

        return None # There is no solution to the maze.
     
 # It searches for the solution, clones the maze, and draws the solution in the cloned maze.
    def showSolutionBfs(self):
        solution = self.bfs()
        if solution == None:
            print("There is no solution")
        else:
            myClone = copy.deepcopy(self)
            myClone.mark(solution.path_solution())
            myClone.draw()
           
if __name__ == "__main__":
    myMaze = Maze(0, 0, 9, 9)
    myMaze.draw()
    myMaze.showSolutionBfs()

A* (a-star)

A is a very well-known informed search algorithm.* It aims to find a solution path to the given goal by selecting the node that minimizes the following function: f(n) = g(n) + h(n).

g(n) is the cost of the path from the start node to the current node “n”. h(n) is a heuristic function that estimates the cost from the current node to the goal. A* requires a priority queue to select the node with the minimum “estimated” cost to expand.

class PriorityQueue:
""" It implements a priority queue."""     
    def __init__(self):
        self.heap = []
 
    # It inserts a new key 'k'.
    def push(self, k):
        heappush(self.heap, k)
 
    # It removes the minimum element from the queue.
    def pop(self):
        return heappop(self.heap)
 
    def is_empty(self):
        return len(self.heap)==0

A common heuristic function is the Manhattan distance between position and goal. It is defined as |position.x-goal.x| + |position.y-goal.y|. It is computed by counting the number of moves along the maze that each node is displaced from its goal position.

     def heuristic (self, position):
        return abs(position.x-self.goal.x) + abs(position.y-self.goal.y)

     def astar(self): # It implements the A* algorithm. It is a method of Maze.
        frontier = PriorityQueue() # The frontier is a priority queue that is used to select the next nodes to be visited.
        frontier.push(Node(self.start, None, 0.0, self.heuristic(self.start))) # The starting node is labeled as discovered since it has already been visited, so it is added to the frontier.
        explored = { (self.start.x, self.start.y): 0.0 } # A dictionary is used to store nodes that has already been visited.

        while not frontier.is_empty(): # While the frontier is not empty, there is a chance to find a solution.
            current_node = frontier.pop() # We dequeue the first element from the priority queue and set the corresponding element as our current node and position. Each iteration, A* chooses the node on the frontier which minimizes f(n) = steps from start node + approximate steps to target.
            current_position = current_node.position

            if self.isGoalReached(current_position): # If the current position is the goal state, we return the current node as we have already found a solution, and the current node has all the information that we need to find the solution path by backtracking through its parents or predecessors.
                return current_node

            for direction in self.validDirections(current_position): # It generates all the node's successors or valid directions from the current position.
                new_cost = current_node.cost + 1 # It calculates both g and h for all the current node's successors. 

                if (direction.x, direction.y) not in explored or explored[(direction.x, direction.y)] > new_cost:  # If a node's successor has not been explored yet or _has a lower cost than the prior time it was visited_, we will add it to the frontier again. In other words, it is completely possible that some nodes may be visited twice or more.
                    explored[(direction.x, direction.y)] = new_cost
                    frontier.push(Node(direction, current_node, new_cost, self.heuristic(direction)))

        return None # There is no solution to the maze.
    
    def showSolutionA(self): # It searches for the solution, clones the maze, and draws the solution in the cloned maze.
        solution = self.astar()
        if solution == None:
            print("There is no solution")
        else:
            myClone = copy.deepcopy(self)
            myClone.mark(solution.path_solution())
            myClone.draw()

if __name__ == "__main__":
    myMaze = Maze(0, 0, 9, 9)
    myMaze.draw()
    myMaze.showSolutionA()
Bitcoin donation

JustToThePoint Copyright © 2011 - 2022 PhD. Máximo Núñez Alarcón, Anawim. ALL RIGHTS RESERVED. Bilingual e-books, articles, and videos to help your child and your entire family succeed, develop a healthy lifestyle, and have a lot of fun.

This website uses cookies to improve your navigation experience.
By continuing, you are consenting to our use of cookies, in accordance with our Cookies Policy and Website Terms and Conditions of use.