代码之家  ›  专栏  ›  技术社区  ›  Yu Chen little_birdie

派斯帕克需要PSUTIL做什么?(面对“用户警告:请安装psutil以便更好地支持溢出”)?

  •  3
  • Yu Chen little_birdie  · 技术社区  · 6 年前

    我开始学习使用pyspark的spark,想知道下面的日志消息是什么意思?

    用户警告:请安装psutil以获得更好的溢出支持

    导致泄漏的操作是 join 两个RDD之间:

    print(user_types.join(user_genres).collect())
    

    这听起来有点明显,但我的第一个问题是

    我确实安装了 psutil 警告消失了,但我想知道到底发生了什么。有一个 very similar question here 但是操作人员主要问的是如何安装 普苏尔 .

    1 回复  |  直到 6 年前
        1
  •  2
  •   Rahul Chawla cbiqih    6 年前

    here

    try:
        import psutil
        def get_used_memory():
            """ Return the used memory in MB """
            process = psutil.Process(os.getpid())
            if hasattr(process, "memory_info"):
                info = process.memory_info()
            else:
                info = process.get_memory_info()
            return info.rss >> 20
    except ImportError:
        def get_used_memory():
            """ Return the used memory in MB """
            if platform.system() == 'Linux':
                for line in open('/proc/self/status'):
                    if line.startswith('VmRSS:'):
                        return int(line.split()[1]) >> 10
            else:
                warnings.warn("Please install psutil to have better "
                              "support with spilling")
                if platform.system() == "Darwin":
                    import resource
                    rss = resource.getrusage(resource.RUSAGE_SELF).ru_maxrss
                    return rss >> 20
                # TODO: support windows
            return 0
    

    def mergeCombiners(self, iterator, check=True):
            """ Merge (K,V) pair by mergeCombiner """
            iterator = iter(iterator)
            # speedup attribute lookup
            d, comb, batch = self.data, self.agg.mergeCombiners, self.batch
            c = 0
            for k, v in iterator:
                d[k] = comb(d[k], v) if k in d else v
                if not check:
                    continue
                c += 1
                if c % batch == 0 and get_used_memory() > self.memory_limit:
                    self._spill()
                    self._partitioned_mergeCombiners(iterator, self._next_limit())
                    break
    

    def _spill(self):
            """
            dump already partitioned data into disks.
            It will dump the data in batch for better performance.
            """
            global MemoryBytesSpilled, DiskBytesSpilled
            path = self._get_spill_dir(self.spills)
            if not os.path.exists(path):
                os.makedirs(path)
            used_memory = get_used_memory()
            if not self.pdata:
                # The data has not been partitioned, it will iterator the
                # dataset once, write them into different files, has no
                # additional memory. It only called when the memory goes
                # above limit at the first time.
                # open all the files for writing
                streams = [open(os.path.join(path, str(i)), 'w')
                           for i in range(self.partitions)]
                for k, v in self.data.iteritems():
                    h = self._partition(k)
                    # put one item in batch, make it compatitable with load_stream
                    # it will increase the memory if dump them in batch
                    self.serializer.dump_stream([(k, v)], streams[h])
                for s in streams:
                    DiskBytesSpilled += s.tell()
                    s.close()
                self.data.clear()
                self.pdata = [{} for i in range(self.partitions)]
            else:
                for i in range(self.partitions):
                    p = os.path.join(path, str(i))
                    with open(p, "w") as f:
                        # dump items in batch
                        self.serializer.dump_stream(self.pdata[i].iteritems(), f)
                    self.pdata[i].clear()
                    DiskBytesSpilled += os.path.getsize(p)
            self.spills += 1
            gc.collect()  # release the memory as much as possible
            MemoryBytesSpilled += (used_memory - get_used_memory()) << 20