开源、轻量级数据处理框架Smallpond综合指南

开源、轻量级数据处理框架Smallpond综合指南

继 DeepSeek R1 的开创性影响之后,DeepSeek AI 的最新产品将继续推动创新:Smallpond。这一轻量级数据处理框架结合了用于 SQL分析 的 DuckDB 和用于高性能分布式存储的 3FS,旨在高效处理 PB 级数据集。Smallpond 有望简化人工智能和大数据应用的数据处理,消除对长期运行服务和复杂基础设施的需求,标志着 DeepSeek 团队的又一次重大飞跃。在本文中,我们将探讨 DeepSeek AI 的 Smallpond 框架的功能、组件和应用,并学习如何使用它。

学习目标

  • 了解 DeepSeek Smallpond 是什么,以及它如何扩展 DuckDB 进行分布式数据处理。
  • 了解如何安装 Smallpond、建立 Ray 集群和配置计算环境。
  • 了解如何使用 Smallpond 的应用程序接口摄取、处理和分割数据。
  • 确定人工智能培训、金融分析和日志处理等实际用例。
  • 权衡使用Smallpond进行分布式分析的优势和挑战。

什么是DeepSeek Smallpond?

Smallpond 是 DeepSeek AI 开发的一个开源、轻量级数据处理框架,旨在将DuckDB(一种高性能、进程内分析数据库)的功能扩展到分布式环境中。

通过将 DuckDB 与 Fire-Flyer 文件系统(3FS)集成,Smallpond 为处理 PB 级数据集提供了一个可扩展的解决方案,而无需像 Apache Spark 这样的传统大数据框架那样开销巨大。

作为 DeepSeek 开源周的一部分,Smallpond 于 2025 年 2 月 28 日发布,主要面向需要高效、简单和高性能分布式分析工具的数据工程师和科学家。

Smallpond 的主要特点

  • 高性能:利用 DuckDB 的本地 SQL 引擎和 3FS 每分钟数百兆字节的吞吐量。
  • 可扩展性:通过手动分区在分布式节点上处理 PB 级数据。
  • 简单:没有长期运行的服务或复杂的依赖关系–只需最少的设置即可部署和使用。
  • 灵活性:支持 Python (3.8-3.12),并与 Ray 集成用于并行处理。
  • 开源:MIT 许可,促进社区贡献和定制。

DeepSeek Smallpond的核心组件

现在让我们来了解 DeepSeek Smallpond 框架的核心组件。

DuckDB

DuckDB 是一个嵌入式、进程内 SQL OLAP 数据库,针对分析工作负载进行了优化。它擅长在大型数据集上执行复杂查询,延迟极小,是单节点分析的理想选择。Smallpond 将 DuckDB 的功能扩展到分布式系统,并保留了其性能优势。

3FS(Fire-Flyer 文件系统)

3FS 是 DeepSeek 为人工智能和高性能计算(HPC)工作负载设计的分布式文件系统。它利用现代固态硬盘和 RDMA 网络提供低延迟、高吞吐量的存储(例如,在 180 节点集群中的读吞吐量为 6.6 TiB/s)。与传统文件系统不同,3FS 优先考虑随机读取而不是缓存,这与人工智能培训和分析的需求相一致。

在Smallpond中集成DuckDB和3FS

在Smallpond中集成DuckDB和3FS

Smallpond 使用 DuckDB 作为计算引擎,使用 3FS 作为存储骨干。数据以 Parquet 格式存储在 3FS 上,由用户手动分区,并在 Ray 的协调下使用 DuckDB 实例跨节点并行处理。这种集成将 DuckDB 的查询效率与 3FS 的可扩展存储相结合,实现了无缝分布式分析。

开始使用Smallpond

现在,让我们来学习如何安装和使用 Smallpond。

第 1 步:安装

Smallpond 基于 Python,可通过 pip 安装,仅适用于 Linux 发行版。确保安装了 Python 3.8-3.11,以及一个兼容的 3FS 集群(或用于测试的本地文件系统)。

Plain text
Copy to clipboard
Open code in new window
EnlighterJS 3 Syntax Highlighter
# Install Smallpond with dependecies
pip install smallpond
# Optional: Install development dependencies (e.g., for testing)
pip install "smallpond[dev]"
# Install Ray Clusters
pip install 'ray[default]'
# Install Smallpond with dependecies pip install smallpond # Optional: Install development dependencies (e.g., for testing) pip install "smallpond[dev]" # Install Ray Clusters pip install 'ray[default]'
# Install Smallpond with dependecies
pip install smallpond
# Optional: Install development dependencies (e.g., for testing)
pip install "smallpond[dev]"
# Install Ray Clusters
pip install 'ray[default]'

对于 3FS,请从 GitHub 仓库克隆并构建:

Plain text
Copy to clipboard
Open code in new window
EnlighterJS 3 Syntax Highlighter
git clone https://github.com/deepseek-ai/3fs
cd 3fs
git submodule update --init --recursive
./patches/apply.sh
# Install dependencies (Ubuntu 20.04/22.04 example)
sudo apt install cmake libuv1-dev liblz4-dev libboost-all-dev
# Build 3FS (refer to 3FS docs for detailed instructions)
git clone https://github.com/deepseek-ai/3fs cd 3fs git submodule update --init --recursive ./patches/apply.sh # Install dependencies (Ubuntu 20.04/22.04 example) sudo apt install cmake libuv1-dev liblz4-dev libboost-all-dev # Build 3FS (refer to 3FS docs for detailed instructions)
git clone https://github.com/deepseek-ai/3fs
cd 3fs
git submodule update --init --recursive
./patches/apply.sh
# Install dependencies (Ubuntu 20.04/22.04 example)
sudo apt install cmake libuv1-dev liblz4-dev libboost-all-dev
# Build 3FS (refer to 3FS docs for detailed instructions)

第 2 步:设置环境

如果使用 3FS,请按照以下代码为 ray 集群初始化 ray 实例:

Plain text
Copy to clipboard
Open code in new window
EnlighterJS 3 Syntax Highlighter
#intialize ray accordingly
ray start --head --num-cpus=<NUM_CPUS> --num-gpus=<NUM_GPUS>
#intialize ray accordingly ray start --head --num-cpus=<NUM_CPUS> --num-gpus=<NUM_GPUS>
#intialize ray accordingly
ray start --head --num-cpus=<NUM_CPUS> --num-gpus=<NUM_GPUS>

运行上述代码将产生类似下图的输出结果:

Smallpond设置环境

现在,我们可以使用上图所示的地址通过 3FS 初始化 Ray。要在 smallpond 中初始化 Ray,请配置一个计算集群(如 AWS EC2、企业内部),在配备固态硬盘的节点上部署 3FS 或本地测试(Linux/Ubuntu)时,使用文件系统路径。

Plain text
Copy to clipboard
Open code in new window
EnlighterJS 3 Syntax Highlighter
import smallpond
# Initialize Smallpond session (local filesystem for testing)
sp = smallpond.init(data_root="Path/to/local/Storage",ray_address="192.168.214.165:6379")# Enter your own ray address
# For 3FS cluster (update with your 3FS endpoint and ray address)
sp = smallpond.init(data_root="3fs://cluster_endpoint",ray_address="192.168.214.165:6379")# Enter your own ray address
import smallpond # Initialize Smallpond session (local filesystem for testing) sp = smallpond.init(data_root="Path/to/local/Storage",ray_address="192.168.214.165:6379")# Enter your own ray address # For 3FS cluster (update with your 3FS endpoint and ray address) sp = smallpond.init(data_root="3fs://cluster_endpoint",ray_address="192.168.214.165:6379")# Enter your own ray address
import smallpond
# Initialize Smallpond session (local filesystem for testing)
sp = smallpond.init(data_root="Path/to/local/Storage",ray_address="192.168.214.165:6379")# Enter your own ray address 
# For 3FS cluster (update with your 3FS endpoint and ray address)
sp = smallpond.init(data_root="3fs://cluster_endpoint",ray_address="192.168.214.165:6379")# Enter your own ray address

第 3 步:数据输入和准备

支持的数据格式

Smallpond 主要支持 Parquet 文件,该文件针对列式存储和 DuckDB 兼容性进行了优化。其他格式(如 CSV)也可以通过 DuckDB 的本地功能来支持。

读写数据

使用 Smallpond 的高级 API 加载和保存数据。

Plain text
Copy to clipboard
Open code in new window
EnlighterJS 3 Syntax Highlighter
# Read Parquet file
df = sp.read_parquet("data/input.prices.parquet")
# Process data (example: filter rows)
df = df.map("price > 100") # SQL-like syntax
# Write results back to Parquet
df.write_parquet("data/output/filtered.prices.parquet")
# Read Parquet file df = sp.read_parquet("data/input.prices.parquet") # Process data (example: filter rows) df = df.map("price > 100") # SQL-like syntax # Write results back to Parquet df.write_parquet("data/output/filtered.prices.parquet")
# Read Parquet file
df = sp.read_parquet("data/input.prices.parquet")
# Process data (example: filter rows)
df = df.map("price > 100")  # SQL-like syntax
# Write results back to Parquet
df.write_parquet("data/output/filtered.prices.parquet")

数据分区策略

手动分区是 Smallpond 可扩展性的关键。请根据您的数据和工作负载选择策略:

  • 按文件数量:分割成固定数量的文件。
  • 按行:平均分配行数。
  • 按散列:根据列的哈希值进行分割,以实现均衡分配。
Plain text
Copy to clipboard
Open code in new window
EnlighterJS 3 Syntax Highlighter
# Partition by file count
df = df.repartition(3)
# Partition by rows
df = df.repartition(3, by_row=True)
# Partition by column hash (e.g., ticker)
df = df.repartition(3, hash_by="ticker")
# Partition by file count df = df.repartition(3) # Partition by rows df = df.repartition(3, by_row=True) # Partition by column hash (e.g., ticker) df = df.repartition(3, hash_by="ticker")
# Partition by file count
df = df.repartition(3)
# Partition by rows
df = df.repartition(3, by_row=True)
# Partition by column hash (e.g., ticker)
df = df.repartition(3, hash_by="ticker")

Step 4:API引用

高级应用程序接口概述

高级应用程序接口简化了数据加载、转换和保存:

  • read_parquet(path) :加载 Parquet 文件。
  • write_parquet(path) :保存处理过的数据。
  • repartition(n, [by_row, hash_by]) :分割数据。
  • map(expr) :应用转换。

底层应用程序接口概述

对于高级应用,Smallpond 直接集成了 DuckDB 的 SQL 引擎和 Ray 的任务分配:

  • 通过 partial_sql 执行原始 SQL
  • 管理用于自定义并行的 Ray 任务。

详细函数说明

  • sp.read_parquet(path) :将 Parquet 文件读入分布式 DataFrame。
Plain text
Copy to clipboard
Open code in new window
EnlighterJS 3 Syntax Highlighter
df = sp.read_parquet("3fs://data/input/*.parquet")
df = sp.read_parquet("3fs://data/input/*.parquet")
df = sp.read_parquet("3fs://data/input/*.parquet")
  • df.map(expr):应用类似 SQL 或 Python 的转换。
Plain text
Copy to clipboard
Open code in new window
EnlighterJS 3 Syntax Highlighter
# SQL-like
df = df.map("SELECT ticker, price * 1.1 AS adjusted_price FROM {0}")
# Python function
df = df.map(lambda row: {"adjusted_price": row["price"] * 1.1})
# SQL-like df = df.map("SELECT ticker, price * 1.1 AS adjusted_price FROM {0}") # Python function df = df.map(lambda row: {"adjusted_price": row["price"] * 1.1})
# SQL-like
df = df.map("SELECT ticker, price * 1.1 AS adjusted_price FROM {0}")
# Python function
df = df.map(lambda row: {"adjusted_price": row["price"] * 1.1})
  • df.partial_sql(query, df):在数据帧上执行 SQL
Plain text
Copy to clipboard
Open code in new window
EnlighterJS 3 Syntax Highlighter
df = sp.partial_sql("SELECT ticker, MIN(price), MAX(price) FROM {0} GROUP BY ticker", df)
df = sp.partial_sql("SELECT ticker, MIN(price), MAX(price) FROM {0} GROUP BY ticker", df)
df = sp.partial_sql("SELECT ticker, MIN(price), MAX(price) FROM {0} GROUP BY ticker", df)

性能基准测试

在 GraySort 等基准测试中,Smallpond 的性能大放异彩,在一个拥有 50 个节点、25 个 3FS 存储节点的计算集群上,它能在 30 分 14 秒内对 8,192 个分区的 110.5 TiB 数据进行排序(吞吐量为 3.66 TiB/分钟)。

Smallpond性能基准测试

优化性能的最佳实践

  • 明智分区:根据节点内存和工作负载匹配分区大小。
  • 充分利用 3FS:使用固态硬盘和 RDMA 获得最大 I/O 吞吐量。
  • 尽量减少分区:预先分区数据,减少网络开销。

可扩展性考虑因素

  • 10TB-1PB:适合拥有适度集群的 Smallpond。
  • 超过 1PB:需要大量基础设施(如 180 多个节点)。
  • 集群管理:使用托管 Ray 服务(如 Anyscale)简化扩展。

Smallpond的应用

  • 人工智能数据预处理:准备 PB 级规模的训练数据集。
  • 金融分析:汇总和分析分布式节点上的市场数据。
  • 日志处理:并行处理服务器日志,以获得实时见解。
  • DeepSeek的人工智能培训:使用 Smallpond 和 3FS 在 31 分钟内对 110.5 TiB 数据进行分类,支持高效的模型训练。

Smallpond的优缺点

功能 优点 缺点
可扩展性 高效处理 PB 级数据 集群管理开销
性能 优秀的基准性能 可能无法优化单节点性能
成本 开源且成本效益高 依赖外部框架
可用性 面向 ML 开发人员的用户友好 API 与 DeepSeek 人工智能模型有关的安全问题
架构 利用 DuckDB 和 Ray Core 进行分布式计算

小结

通过将 DuckDB 的分析能力与 3FS 的高性能存储相结合,Smallpond 重新定义了分布式数据处理。它的简易性、可扩展性和开源性使其成为现代数据工作流的理想选择。无论您是预处理人工智能数据集还是分析 TB 级的日志,Smallpond 都能为您提供轻便而强大的解决方案。深入其中,尝试使用代码,加入社区,塑造未来!

  • Smallpond 是一个开源的分布式数据处理框架,使用 3FS 和 Ray 扩展了 DuckDB 的 SQL 功能。
  • 它目前只支持 Linux 发行版,需要 Python 3.8-3.12。
  • Smallpond 是人工智能预处理、金融分析和大数据工作负载的理想选择,但需要谨慎的集群管理。
  • 它是 Apache Spark 的高性价比替代品,开销较低,易于部署。
  • 尽管它有很多优点,但需要考虑基础设施问题,例如集群设置和 DeepSeek 模型的安全问题。

评论留言