Source code for jacinle.concurrency.queue

#! /usr/bin/env python3
# -*- coding: utf-8 -*-
# File   : queue.py
# Author : Jiayuan Mao
# Email  : maojiayuan@gmail.com
# Date   : 01/19/2018
#
# This file is part of Jacinle.
# Distributed under terms of the MIT license.

import queue
import heapq

__all__ = ['ListToFill', 'iter_queue', 'sorted_iter']


[docs] class ListToFill(list):
[docs] def __init__(self, nr_target): super().__init__() self._nr_target = nr_target
[docs] def append(self, *args, **kwargs): super().append(*args, **kwargs) if len(self) >= self._nr_target: raise queue.Full()
[docs] def iter_queue(q, total=None): if total is None: while True: yield q.get() else: for i in range(total): yield q.get()
[docs] def sorted_iter(iter, id_func=None): if id_func is None: id_func = lambda x: x[0] current = -1 buffer = [] for i, v in enumerate(iter): if v is None: assert len(buffer) == 0, 'Buffer is not empty when receiving stop signal.' break idv = id_func(v) if idv == current + 1: yield idv, v current += 1 while len(buffer) and id_func(buffer[0]) == current + 1: ele = heapq.heappop(buffer) yield ele[0], ele[2] current += 1 else: heapq.heappush(buffer, (idv, i, v))