Heap
Definition
Heap, or priority queue, is a data structure that supports quick searching of the prioritized element such as the minimum or maximum in an array. It is commonly represented as a complete binary tree such as below
The tree has the following properties:
- Complete binary tree: All levels are filled except possibly the last layer. Nodes are placed as far to the left as possible.
- All descendants of a node are larger than the node itself. However, they do not need arranging in any further order.
While heap is commonly represented as a binary tree, it is often implemented with an array containing the level-order traversal of the tree instead.
The first node is usually omitted - the heap is counted from 1 onwards. Because of the complete binary tree property, the heap has an interesting property that at every node with position \(i\) in the array
Node | Position |
---|---|
Left child | \(2*i\) |
Right child | \(2*i+1\) |
Parent | \(i//2\) |
Here is an example implementation of Min Heap i.e. a heap that gives you the minimum element in \(O(1)\) time complexity from LeetCode.
# Implementing "Min Heap"
class MinHeap:
def __init__(self, heapSize):
# Create a complete binary tree using an array
# Then use the binary tree to construct a Heap
self.heapSize = heapSize
# the number of elements is needed when instantiating an array
# heapSize records the size of the array
self.minheap = [0] * (heapSize + 1)
# realSize records the number of elements in the Heap
self.realSize = 0
# Function to add an element
def add(self, element):
self.realSize += 1
# If the number of elements in the Heap exceeds the preset heapSize
# print "Added too many elements" and return
if self.realSize > self.heapSize:
print("Added too many elements!")
self.realSize -= 1
return
# Add the element into the array
self.minheap[self.realSize] = element
# Index of the newly added element
= self.realSize
index # Parent node of the newly added element
# Note if we use an array to represent the complete binary tree
# and store the root node at index 1
# index of the parent node of any node is [index of the node / 2]
# index of the left child node is [index of the node * 2]
# index of the right child node is [index of the node * 2 + 1]
= index // 2
parent # If the newly added element is smaller than its parent node,
# its value will be exchanged with that of the parent node
while (self.minheap[index] < self.minheap[parent] and index > 1):
self.minheap[parent], self.minheap[index] = self.minheap[index], self.minheap[parent]
= parent
index = index // 2
parent
# Get the top element of the Heap
def peek(self):
return self.minheap[1]
# Delete the top element of the Heap
def pop(self):
# If the number of elements in the current Heap is 0,
# print "Don't have any elements" and return a default value
if self.realSize < 1:
print("Don't have any element!")
return sys.maxsize
else:
# When there are still elements in the Heap
# self.realSize >= 1
= self.minheap[1]
removeElement # Put the last element in the Heap to the top of Heap
self.minheap[1] = self.minheap[self.realSize]
self.realSize -= 1
= 1
index # When the deleted element is not a leaf node
while (index <= self.realSize // 2):
# the left child of the deleted element
= index * 2
left # the right child of the deleted element
= (index * 2) + 1
right # If the deleted element is larger than the left or right child
# its value needs to be exchanged with the smaller value
# of the left and right child
if (self.minheap[index] > self.minheap[left] or self.minheap[index] > self.minheap[right]):
if self.minheap[left] < self.minheap[right]:
self.minheap[left], self.minheap[index] = self.minheap[index], self.minheap[left]
= left
index else:
self.minheap[right], self.minheap[index] = self.minheap[index], self.minheap[right]
= right
index else:
break
return removeElement
# return the number of elements in the Heap
def size(self):
return self.realSize
def __str__(self):
return str(self.minheap[1 : self.realSize + 1])
if __name__ == "__main__":
# Test cases
= MinHeap(5)
minHeap 3)
minHeap.add(1)
minHeap.add(2)
minHeap.add(# [1,3,2]
print(minHeap)
# 1
print(minHeap.peek())
# 1
print(minHeap.pop())
# 2
print(minHeap.pop())
# 3
print(minHeap.pop())
4)
minHeap.add(5)
minHeap.add(# [4,5]
print(minHeap)
In Python, we usually just need to use the built-in heapq
module to work with problem requiring a Min Heap.
Problems
1. Kth Largest Element in a Stream
Intuition
I got some trouble because Python’s heapq
provides support for Min Heap, but not Max Heap. However, I was then given a clever hint: the \(k^{th}\) largest element is the smallest element in the largest \(k\) elements (note the singular - plural nouns). Hence, a a min heap can do the job perfectly.
Algorithm
The meat is defining the .add()
method. To implement this, I need to define self.heap
and self.limit
(k
) beforehand.
.add()
: There are two case: 1. If the heap has not reached its limit, use heapq.heappush()
. 2. Else, use heapq.heappushpop()
. 3. Return the first element i.e., the minimum element.
Complexity
Time complexity: \(O(n \times \log (k))\): Every call with
heapq.heappush()
orheapq.heappushpop()
with.add()
costs \(O(\log(k))\). In the constructor method, \(n\) such calls happen - hence the time complexity.Space complexity: \(O(k)\): The internal
self.heap
only contains up to \(k\) elements at a time.
Code
import heapq
class KthLargest:
def __init__(self, k: int, nums: List[int]) -> None:
self.heap = []
self.limit = k
for num in nums:
self.add(num)
def add(self, val: int) -> int:
if len(self.heap) < self.limit:
self.heap, val)
heapq.heappush(else:
self.heap, val)
heapq.heappushpop(return self.heap[0]
# Your KthLargest object will be instantiated and called as such:
# obj = KthLargest(k, nums)
# param_1 = obj.add(val)
2. Last Stone Weight
Intuition
This is a problem that cannot avoid max heap. Well, luckily it can be constructed with a min heap - all I have to do is negating the value of the element.
Algorithm
- Negating the value of every element in the inpyt array
stones
. - Heapify
stones
(in-place, linear time complexity). - While the length is larger than 1:
- Pop from the heap two times. If the two values are different, push the remaining stone into the heap.
- Return per condition.
Complexity
Time complexity: \(O(n \times \log (n))\): The dominating operation is popping from the heap. The algorithm does that \(n\) times, each costs \(O(\log(n))\).
Space complexity: \(O(1)\): All operation happens in-place with pointer manipulation.
Code
class Solution:
def lastStoneWeight(self, stones: List[int]) -> int:
= [-i for i in stones]
stones
heapq.heapify(stones)
while len(stones) > 1:
= heapq.heappop(stones)
stone1 = heapq.heappop(stones)
stone2
if stone1 != stone2:
- stone2)
heapq.heappush(stones, stone1
if len(stones) == 1:
return - stones[0]
return 0
3. K Closest Points to Origin
Intuition
I know that I will need to calculate the distance from origin of every coordinates. Hence, I decided to push that information into the input array by turning each element from an array of coordinates into a tuple of (distance, [coordinates])
. Heapify the input array will consider this distance first before the coordinates, which does not matter (I can return equidistant coordinates in any order).
Algorithm
- Modify each element of
points
in-place. - Heapify points.
- Construct the result list and return.
Complexity
Time complexity: \(O(n + k\log(n))\): Modify and heapify the array both takes \(O(n)\) time. The algorithm pop form the heap \(k\) times, each time takes \(O(\log(n)\) time. I don’t know which one of the terms will dominate, so I include both.
Space complexity: \(O(n)\): Ignoring the return list, it costs \(O(n)\) space in modifying the input array.
Code
import heapq
from math import sqrt
class Solution:
def kClosest(self, points: List[List[int]], k: int) -> List[List[int]]:
for i in range(len(points)):
= points[i]
x, y = (sqrt(x**2 + y**2), [x, y])
points[i]
heapq.heapify(points)
return [heapq.heappop(points)[1] for _ in range(k)]
4. Kth Largest Element in an Array
Intuition
Same as above, this problem can solved with a min heap maintaining only the largest \(k\) elements in the array.
Algorithm
- Initialize a new heap.
- Iterate the array. Push to the heap if the length has not reached \(k\), else push-pop.
- Return the first element in the heap.
Complexity
Time complexity: \(O(n \log(k))\): Iterate the array of length \(n\), each operation takes \(O(\log(k))\) time.
Space complexity: \(O(k)\): The size of the new heap.
Code
import heapq
class Solution:
def findKthLargest(self, nums: List[int], k: int) -> int:
= []
heap for num in nums:
if len(heap) < k:
heapq.heappush(heap, num)else:
heapq.heappushpop(heap, num)return heap[0]
5. Find Median from Data Stream
Intuition
The trick is having two heaps: one Max Heap and one Min Heap for each half of the sorted array. Afterwards, the median is just a calculation with the top of the second heap i.e., the smallest of the second sorted half and the top of the first heap i.e., the largest of the first sorted half.
Algorithm
__init__
:- Initialize two heaps.
addNum
:- If the two heaps are currently of the same size, push the number to one of them. I chose the Min Heap. To ensure that the each heap contains only one half of the sorted array,
heappushpop
is called with the number on the other half first. - Else, perform the operation to the other heap.
- If the two heaps are currently of the same size, push the number to one of them. I chose the Min Heap. To ensure that the each heap contains only one half of the sorted array,
findMedian
:- If the two heaps are of different length, just return the top of the Min Heap as it is designed to have the larger length i.e., contain the median in this case.
- If the two heaps are of equal length, the median is calculated as the average of the two tops. Some math is required as the Max Heap stores the negative values.
Complexity
Time complexity:
addNum
- \(O(\log n)\)findMedian
- \(O(1)\)
Space complexity: \(O(n)\) - only needs to store the data stream itself as every method is in-place.
Code
from heapq import heappush, heappushpop
class MedianFinder:
"""
Keeping two heaps: a Max Heap for the first part of the array, and a
Min Heap for the second part of the array. The median can be quickly
calculated from the top of the two heaps
"""
def __init__(self):
self.first_half = []
self.second_half = []
def addNum(self, num: int) -> None:
if len(self.first_half) == len(self.second_half):
self.second_half, -heappushpop(self.first_half, -num))
heappush(else:
self.first_half, -heappushpop(self.second_half, num))
heappush(
def findMedian(self) -> float:
if len(self.first_half) == len(self.second_half):
return (self.second_half[0] - self.first_half[0]) / 2
else:
return float(self.second_half[0])
# Your MedianFinder object will be instantiated and called as such:
# obj = MedianFinder()
# obj.addNum(num)
# param_2 = obj.findMedian()
6. Task Scheduling
Intuition
I misunderstood the problem. So the n
cooldown period is actually the slots between 2 occurrences of the same task such as this for n == 3
and only task "A"
:
A _ _ A _ _ A0 1 2 3 4 5 6
In the idle time between, different tasks e.g., "B"
or "C"
can be slotted in
A B _ A C _ A0 1 2 3 4 5 6
The main constraint is the most frequent task. Hence, the solution can be constructed by first adding all the most frequent tasks in first, and then slotting the other in between. This will lead to 3 cases:
- The leftover tasks do not take up all the slots in the middle. This means that there will be idle slots. The time needed will be the length of the skeleton.
- The leftover tasks take up more than the slots in the middle. In that case, there will be no idle slots, and the result slots when put into an array will be same as the input array. Hence, the time required is the length of the input array.
- The leftover tasks do not take up all the slots in the middle but still spill over. This happens when there are more than 1 task with the same max number of occurrences. Slotting them in will lead to one more unit time taken at the end. To account for this, increment the time measured so far whenever encountered.
This is the best approach, but not the heap approach. For the heap approach, it is essentially constructing each part of the skeleton iteratively by greedily adding in elements in the descending order (a Max Heap is used to store the number of occurrences)
| A D E | A F G | A i i | A i i | A
A B C 0 1 2 | 3 4 5 | 6 7 8 | 9 10 11 | 12 13 14 | 15
Algorithm
- Optimal
- Initialize variables
time
to 0,task_to_times
withCounter
to count the occurrences of each task. - Get the maximum number of occurrences
longest
withCounter.most_common
. - Construct the skeleton
answer
with(longest - 1) * (n + 1)
.longest - 1
is the number of gaps between the tasks, andn + 1
to account for the task it self. By right, I do not include the last occurrence, which will be counted later. - Loop over the
times
a task occurs intask_to_times
, on encountering thelongest
time again, add 1 toanswer
. This also deals with the third case above. - Return the maximum between
answer
and the length of the input array.
- Initialize variables
- Heap
- Initialize a Max Heap of number of occurrences of a task.
- While
heap
is not empty:- Create a
temp
orary list. - Iterate for
n
times:- Increment
time
. - Pop from
heap
. - If the value is larger than 1, decrement it and add it to
temp
. - If
heap
andtemp
are both empty after the step, break as the algorithm has reached the end.
- Increment
- Push back the items in
temp
intoheap
.
- Create a
Complexity
Time complexity: \(O(n \log n)\) in both cases, for \(n\) the length of the input array. For the first case, this is the cost of constructing
Counter
and getting the maximum frequency. For the second case, the loop is executed equal or very close to \(n\) times, each iteration costs \(O(\log n)\) time.Space complexity: \(O(n)\) - the cost of constructing the
Counter
and/or theheap
.
Code
- Math
from collections import Counter
from typing import List
class Solution:
def leastInterval(self, tasks: List[str], n: int) -> int:
= 0
time = Counter(tasks)
task_to_times = task_to_times.most_common(1)[0][-1] # List[Tuple[str, int]]
longest = (longest - 1) * (n + 1)
answer
for times in task_to_times.values():
if times == longest:
+= 1
answer return max(len(tasks), answer)
- Heap
from collections import Counter
from typing import List
from heapq import heappush, heappop
class Solution:
def leastInterval(self, tasks: List[str], n: int) -> int:
= Counter(tasks)
task_to_times = []
heap
for times in task_to_times.values():
-times)
heappush(heap,
= 0
time while heap:
= []
temp for _ in range(n + 1):
+= 1
time if heap:
= heappop(heap)
most_frequent_task if most_frequent_task < -1:
+ 1)
temp.append(most_frequent_task
if not (heap or temp):
break
for item in temp:
heappush(heap, item)
return time
7. Merge k Sorted Lists
Intuition
Heap merge, the combination of heap sort and merge sort.
Joking aside, the problem can be solved trivially with a heap. Just iterate through each lists, add the node value to the heap, and then construct the combined list. Easy.
You can also do this in-place, but I don’t want to mess up with the input values.
Algorithm
- Iterate through every linked list in the list, push the value to the heap.
- Pop from the heap until empty, construct a new
ListNode
, and connect the node to the new linked list. For implementation, it is always great to create a sentinel head. - Return.
Complexity
Time complexity: \(O(n \log n)\) for \(n\) the total number of nodes in all linked lists.
Space complexity: \(O(n)\) for the new merged linked list.
Code
# Definition for singly-linked list.
# class ListNode:
# def __init__(self, val=0, next=None):
# self.val = val
# self.next = next
from heapq import heappush, heappop
from typing import List, Optional
class Solution:
def mergeKLists(self, lists: List[Optional[ListNode]]) -> Optional[ListNode]:
= len(lists)
no_of_lists = ListNode()
sentinel_head = sentinel_head
current = []
heap
for list_index in range(no_of_lists):
while lists[list_index] is not None:
heappush(heap, lists[list_index].val)= lists[list_index].next
lists[list_index]
for _ in range(len(heap)):
= heappop(heap)
val next = ListNode(val)
current.= current.next
current
return sentinel_head.next