Heap

Author

Pham Nguyen Hung

Published

July 21, 2023

Heap

Definition

Heap, or priority queue, is a data structure that supports quick searching of the prioritized element such as the minimum or maximum in an array. It is commonly represented as a complete binary tree such as below

The tree has the following properties:

  1. Complete binary tree: All levels are filled except possibly the last layer. Nodes are placed as far to the left as possible.
  2. All descendants of a node are larger than the node itself. However, they do not need arranging in any further order.

While heap is commonly represented as a binary tree, it is often implemented with an array containing the level-order traversal of the tree instead.

The first node is usually omitted - the heap is counted from 1 onwards. Because of the complete binary tree property, the heap has an interesting property that at every node with position \(i\) in the array

Node Position
Left child \(2*i\)
Right child \(2*i+1\)
Parent \(i//2\)

Here is an example implementation of Min Heap i.e. a heap that gives you the minimum element in \(O(1)\) time complexity from LeetCode.

# Implementing "Min Heap"
class MinHeap:
    def __init__(self, heapSize):
        # Create a complete binary tree using an array
        # Then use the binary tree to construct a Heap
        self.heapSize = heapSize
        # the number of elements is needed when instantiating an array
        # heapSize records the size of the array
        self.minheap = [0] * (heapSize + 1)
        # realSize records the number of elements in the Heap
        self.realSize = 0

    # Function to add an element
    def add(self, element):
        self.realSize += 1
        # If the number of elements in the Heap exceeds the preset heapSize
        # print "Added too many elements" and return
        if self.realSize > self.heapSize:
            print("Added too many elements!")
            self.realSize -= 1
            return
        # Add the element into the array
        self.minheap[self.realSize] = element
        # Index of the newly added element
        index = self.realSize
        # Parent node of the newly added element
        # Note if we use an array to represent the complete binary tree
        # and store the root node at index 1
        # index of the parent node of any node is [index of the node / 2]
        # index of the left child node is [index of the node * 2]
        # index of the right child node is [index of the node * 2 + 1]
        parent = index // 2
        # If the newly added element is smaller than its parent node,
        # its value will be exchanged with that of the parent node 
        while (self.minheap[index] < self.minheap[parent] and index > 1):
            self.minheap[parent], self.minheap[index] = self.minheap[index], self.minheap[parent]
            index = parent
            parent = index // 2
    
    # Get the top element of the Heap
    def peek(self):
        return self.minheap[1]
    
    # Delete the top element of the Heap
    def pop(self):
        # If the number of elements in the current Heap is 0,
        # print "Don't have any elements" and return a default value
        if self.realSize < 1:
            print("Don't have any element!")
            return sys.maxsize
        else:
            # When there are still elements in the Heap
            # self.realSize >= 1
            removeElement = self.minheap[1]
            # Put the last element in the Heap to the top of Heap
            self.minheap[1] = self.minheap[self.realSize]
            self.realSize -= 1
            index = 1
            # When the deleted element is not a leaf node
            while (index <= self.realSize // 2):
                # the left child of the deleted element
                left = index * 2
                # the right child of the deleted element
                right = (index * 2) + 1
                # If the deleted element is larger than the left or right child
                # its value needs to be exchanged with the smaller value
                # of the left and right child
                if (self.minheap[index] > self.minheap[left] or self.minheap[index] > self.minheap[right]):
                    if self.minheap[left] < self.minheap[right]:
                        self.minheap[left], self.minheap[index] = self.minheap[index], self.minheap[left]
                        index = left
                    else:
                        self.minheap[right], self.minheap[index] = self.minheap[index], self.minheap[right]
                        index = right
                else:
                    break
            return removeElement
    
    # return the number of elements in the Heap
    def size(self):
        return self.realSize
    
    def __str__(self):
        return str(self.minheap[1 : self.realSize + 1])
        

if __name__ == "__main__":
        # Test cases
        minHeap = MinHeap(5)
        minHeap.add(3)
        minHeap.add(1)
        minHeap.add(2)
        # [1,3,2]
        print(minHeap)
        # 1
        print(minHeap.peek())
        # 1
        print(minHeap.pop())
        # 2
        print(minHeap.pop())
        # 3
        print(minHeap.pop())
        minHeap.add(4)
        minHeap.add(5)
        # [4,5]
        print(minHeap)

In Python, we usually just need to use the built-in heapq module to work with problem requiring a Min Heap.

Problems

1. Kth Largest Element in a Stream

Intuition

I got some trouble because Python’s heapq provides support for Min Heap, but not Max Heap. However, I was then given a clever hint: the \(k^{th}\) largest element is the smallest element in the largest \(k\) elements (note the singular - plural nouns). Hence, a a min heap can do the job perfectly.

Algorithm

The meat is defining the .add() method. To implement this, I need to define self.heap and self.limit (k) beforehand.

.add(): There are two case: 1. If the heap has not reached its limit, use heapq.heappush(). 2. Else, use heapq.heappushpop(). 3. Return the first element i.e., the minimum element.

Complexity

  • Time complexity: \(O(n \times \log (k))\): Every call with heapq.heappush() or heapq.heappushpop() with .add() costs \(O(\log(k))\). In the constructor method, \(n\) such calls happen - hence the time complexity.

  • Space complexity: \(O(k)\): The internal self.heap only contains up to \(k\) elements at a time.

Code

import heapq

class KthLargest:

    def __init__(self, k: int, nums: List[int]) -> None:
        self.heap = []
        self.limit = k
        for num in nums:
            self.add(num)

    def add(self, val: int) -> int:
        if len(self.heap) < self.limit:
            heapq.heappush(self.heap, val)
        else:
            heapq.heappushpop(self.heap, val)
        return self.heap[0]


# Your KthLargest object will be instantiated and called as such:
# obj = KthLargest(k, nums)
# param_1 = obj.add(val)

2. Last Stone Weight

Intuition

This is a problem that cannot avoid max heap. Well, luckily it can be constructed with a min heap - all I have to do is negating the value of the element.

Algorithm

  1. Negating the value of every element in the inpyt array stones.
  2. Heapify stones (in-place, linear time complexity).
  3. While the length is larger than 1:
    1. Pop from the heap two times. If the two values are different, push the remaining stone into the heap.
  4. Return per condition.

Complexity

  • Time complexity: \(O(n \times \log (n))\): The dominating operation is popping from the heap. The algorithm does that \(n\) times, each costs \(O(\log(n))\).

  • Space complexity: \(O(1)\): All operation happens in-place with pointer manipulation.

Code

class Solution:
    def lastStoneWeight(self, stones: List[int]) -> int:        
        stones = [-i for i in stones]
        heapq.heapify(stones)

        while len(stones) > 1:
            stone1 = heapq.heappop(stones)
            stone2 = heapq.heappop(stones)

            if stone1 != stone2:
                heapq.heappush(stones, stone1 - stone2)

        if len(stones) == 1:
            return - stones[0]
        return 0

3. K Closest Points to Origin

Intuition

I know that I will need to calculate the distance from origin of every coordinates. Hence, I decided to push that information into the input array by turning each element from an array of coordinates into a tuple of (distance, [coordinates]). Heapify the input array will consider this distance first before the coordinates, which does not matter (I can return equidistant coordinates in any order).

Algorithm

  1. Modify each element of points in-place.
  2. Heapify points.
  3. Construct the result list and return.

Complexity

  • Time complexity: \(O(n + k\log(n))\): Modify and heapify the array both takes \(O(n)\) time. The algorithm pop form the heap \(k\) times, each time takes \(O(\log(n)\) time. I don’t know which one of the terms will dominate, so I include both.

  • Space complexity: \(O(n)\): Ignoring the return list, it costs \(O(n)\) space in modifying the input array.

Code

import heapq
from math import sqrt

class Solution:
    def kClosest(self, points: List[List[int]], k: int) -> List[List[int]]:
        for i in range(len(points)):
            x, y = points[i]
            points[i] = (sqrt(x**2 + y**2), [x, y])
        heapq.heapify(points)

        return [heapq.heappop(points)[1] for _ in range(k)]        

4. Kth Largest Element in an Array

Intuition

Same as above, this problem can solved with a min heap maintaining only the largest \(k\) elements in the array.

Algorithm

  1. Initialize a new heap.
  2. Iterate the array. Push to the heap if the length has not reached \(k\), else push-pop.
  3. Return the first element in the heap.

Complexity

  • Time complexity: \(O(n \log(k))\): Iterate the array of length \(n\), each operation takes \(O(\log(k))\) time.

  • Space complexity: \(O(k)\): The size of the new heap.

Code

import heapq

class Solution:
    def findKthLargest(self, nums: List[int], k: int) -> int:
        heap = []
        for num in nums:
            if len(heap) < k:
                heapq.heappush(heap, num)
            else:
                heapq.heappushpop(heap, num)
        return heap[0]

5. Find Median from Data Stream

Intuition

The trick is having two heaps: one Max Heap and one Min Heap for each half of the sorted array. Afterwards, the median is just a calculation with the top of the second heap i.e., the smallest of the second sorted half and the top of the first heap i.e., the largest of the first sorted half.

Algorithm

  • __init__:
    • Initialize two heaps.
  • addNum:
    • If the two heaps are currently of the same size, push the number to one of them. I chose the Min Heap. To ensure that the each heap contains only one half of the sorted array, heappushpop is called with the number on the other half first.
    • Else, perform the operation to the other heap.
  • findMedian:
    • If the two heaps are of different length, just return the top of the Min Heap as it is designed to have the larger length i.e., contain the median in this case.
    • If the two heaps are of equal length, the median is calculated as the average of the two tops. Some math is required as the Max Heap stores the negative values.

Complexity

  • Time complexity:

    • addNum - \(O(\log n)\)
    • findMedian - \(O(1)\)
  • Space complexity: \(O(n)\) - only needs to store the data stream itself as every method is in-place.

Code

from heapq import heappush, heappushpop

class MedianFinder:
    """
    Keeping two heaps: a Max Heap for the first part of the array, and a
    Min Heap for the second part of the array. The median can be quickly
    calculated from the top of the two heaps
    """

    def __init__(self):
        self.first_half = []
        self.second_half = []

    def addNum(self, num: int) -> None:
        if len(self.first_half) == len(self.second_half):
            heappush(self.second_half, -heappushpop(self.first_half, -num))
        else:
            heappush(self.first_half, -heappushpop(self.second_half, num))

    def findMedian(self) -> float:
        if len(self.first_half) == len(self.second_half):
            return (self.second_half[0] - self.first_half[0]) / 2
        else:
            return float(self.second_half[0])


# Your MedianFinder object will be instantiated and called as such:
# obj = MedianFinder()
# obj.addNum(num)
# param_2 = obj.findMedian()

6. Task Scheduling

Intuition

I misunderstood the problem. So the n cooldown period is actually the slots between 2 occurrences of the same task such as this for n == 3 and only task "A":

A _ _ A _ _ A
0 1 2 3 4 5 6

In the idle time between, different tasks e.g., "B" or "C" can be slotted in

A B _ A C _ A
0 1 2 3 4 5 6

The main constraint is the most frequent task. Hence, the solution can be constructed by first adding all the most frequent tasks in first, and then slotting the other in between. This will lead to 3 cases:

  1. The leftover tasks do not take up all the slots in the middle. This means that there will be idle slots. The time needed will be the length of the skeleton.
  2. The leftover tasks take up more than the slots in the middle. In that case, there will be no idle slots, and the result slots when put into an array will be same as the input array. Hence, the time required is the length of the input array.
  3. The leftover tasks do not take up all the slots in the middle but still spill over. This happens when there are more than 1 task with the same max number of occurrences. Slotting them in will lead to one more unit time taken at the end. To account for this, increment the time measured so far whenever encountered.

This is the best approach, but not the heap approach. For the heap approach, it is essentially constructing each part of the skeleton iteratively by greedily adding in elements in the descending order (a Max Heap is used to store the number of occurrences)

A B C | A D E | A F G | A i  i  | A  i  i  | A
0 1 2 | 3 4 5 | 6 7 8 | 9 10 11 | 12 13 14 | 15

Algorithm

  • Optimal
    1. Initialize variables time to 0, task_to_times with Counter to count the occurrences of each task.
    2. Get the maximum number of occurrences longest with Counter.most_common.
    3. Construct the skeleton answer with (longest - 1) * (n + 1). longest - 1 is the number of gaps between the tasks, and n + 1 to account for the task it self. By right, I do not include the last occurrence, which will be counted later.
    4. Loop over the times a task occurs in task_to_times, on encountering the longest time again, add 1 to answer. This also deals with the third case above.
    5. Return the maximum between answer and the length of the input array.
  • Heap
    1. Initialize a Max Heap of number of occurrences of a task.
    2. While heap is not empty:
      1. Create a temporary list.
      2. Iterate for n times:
        1. Increment time.
        2. Pop from heap.
        3. If the value is larger than 1, decrement it and add it to temp.
        4. If heap and temp are both empty after the step, break as the algorithm has reached the end.
      3. Push back the items in temp into heap.

Complexity

  • Time complexity: \(O(n \log n)\) in both cases, for \(n\) the length of the input array. For the first case, this is the cost of constructing Counter and getting the maximum frequency. For the second case, the loop is executed equal or very close to \(n\) times, each iteration costs \(O(\log n)\) time.

  • Space complexity: \(O(n)\) - the cost of constructing the Counter and/or the heap.

Code

  • Math
from collections import Counter
from typing import List

class Solution:
    def leastInterval(self, tasks: List[str], n: int) -> int:
        time = 0
        task_to_times = Counter(tasks)
        longest = task_to_times.most_common(1)[0][-1] # List[Tuple[str, int]]
        answer = (longest - 1) * (n + 1)

        for times in task_to_times.values():
            if times == longest:
                answer += 1    
        return max(len(tasks), answer)
  • Heap
from collections import Counter
from typing import List
from heapq import heappush, heappop

class Solution:
    def leastInterval(self, tasks: List[str], n: int) -> int:
        task_to_times = Counter(tasks)
        heap = []

        for times in task_to_times.values():
            heappush(heap, -times)
        
        time = 0
        while heap:
            temp = []
            for _ in range(n + 1):
                time += 1
                if heap:
                    most_frequent_task = heappop(heap)
                    if most_frequent_task < -1:
                        temp.append(most_frequent_task + 1)
                        
                if not (heap or temp):
                    break

            for item in temp:
                heappush(heap, item)
        
        return time

7. Merge k Sorted Lists

Intuition

Heap merge, the combination of heap sort and merge sort.

Joking aside, the problem can be solved trivially with a heap. Just iterate through each lists, add the node value to the heap, and then construct the combined list. Easy.

You can also do this in-place, but I don’t want to mess up with the input values.

Algorithm

  1. Iterate through every linked list in the list, push the value to the heap.
  2. Pop from the heap until empty, construct a new ListNode, and connect the node to the new linked list. For implementation, it is always great to create a sentinel head.
  3. Return.

Complexity

  • Time complexity: \(O(n \log n)\) for \(n\) the total number of nodes in all linked lists.

  • Space complexity: \(O(n)\) for the new merged linked list.

Code

# Definition for singly-linked list.
# class ListNode:
#     def __init__(self, val=0, next=None):
#         self.val = val
#         self.next = next
from heapq import heappush, heappop
from typing import List, Optional

class Solution:
    def mergeKLists(self, lists: List[Optional[ListNode]]) -> Optional[ListNode]:
        no_of_lists = len(lists)
        sentinel_head = ListNode()
        current = sentinel_head
        heap = []
        
        for list_index in range(no_of_lists):
            while lists[list_index] is not None:
                heappush(heap, lists[list_index].val)
                lists[list_index] = lists[list_index].next
        
        for _ in range(len(heap)):
            val = heappop(heap)
            current.next = ListNode(val)
            current = current.next
        
        return sentinel_head.next