Apache Iceberg的崛起:数据湖的游戏规则改变者
多年来,通常建立在云对象存储(如谷歌云存储GCS)上的数据湖提供了无与伦比的可扩展性和成本效率。然而,它们往往缺乏传统数据仓库中的关键特性,如事务一致性、模式演变和分析查询的性能优化。这正是Apache Iceberg大放异彩的地方。
Apache Iceberg是一种开放表格式,旨在解决这些限制。它位于云存储中的数据文件(如Parquet、ORC或Avro)之上,提供了一层元数据,将文件集合转变为高性能、类似SQL的表。以下是Iceberg如此强大的原因:
通过在数据湖之上提供这些类似数据仓库的功能,Apache Iceberg实现了真正的“数据湖仓”,提供了两者的最佳结合:云存储的灵活性和成本效益与结构化表的可靠性和性能。
谷歌云的BigLake表在BigQuery中支持Apache Iceberg提供类似于标准BigQuery表的全托管表体验,但所有数据都存储在客户自有的存储桶中。支持的功能包括:
以下是如何使用GoogleSQL创建一个空的BigLake Iceberg表的示例:
SQL
CREATE TABLE PROJECT_ID.DATASET_ID.my_iceberg_table (
name STRING,
id INT64
)
WITH CONNECTION PROJECT_ID.REGION.CONNECTION_ID
OPTIONS (
file_format = 'PARQUET'
table_format = 'ICEBERG'
storage_uri = 'gs://BUCKET/PATH');
然后您可以导入数据到数据中使用LOAD INTO
从文件导入数据或INSERT INTO
从另一个表中。
SQL
# Load from file
LOAD DATA INTO PROJECT_ID.DATASET_ID.my_iceberg_table
FROM FILES (
uris=['gs://bucket/path/to/data'],
format='PARQUET');
# Load from table
INSERT INTO PROJECT_ID.DATASET_ID.my_iceberg_table
SELECT name, id
FROM PROJECT_ID.DATASET_ID.source_table
除了全托管服务外,Apache Iceberg还支持作为BigQuery中的读取-外部表。使用此功能指向具有数据文件的现有路径。
SQL
CREATE OR REPLACE EXTERNAL TABLE PROJECT_ID.DATASET_ID.my_external_iceberg_table
WITH CONNECTION PROJECT_ID.REGION.CONNECTION_ID
OPTIONS (
format = 'ICEBERG',
uris =
['gs://BUCKET/PATH/TO/DATA'],
require_partition_filter = FALSE);
Apache Spark:数据湖仓分析的引擎
虽然Apache Iceberg为您的数据湖仓提供了结构和管理,但Apache Spark是将其带入生活的处理引擎。Spark是一个强大的开源分布式处理系统,以其速度、灵活性和处理多样化大数据工作负载的能力而闻名。Spark的内存处理、包括ML和基于SQL的处理在内的强大工具生态系统以及对Iceberg的深度支持使其成为一个绝佳的选择。
Apache Spark深度集成到谷歌云生态系统中。使用Apache Spark在谷歌云上的好处包括:
Iceberg + Spark:更好的结合
Iceberg和Spark共同构成了构建高性能和可靠数据湖仓的强大组合。Spark可以利用Iceberg的元数据来优化查询计划、执行高效的数据修剪,并确保数据湖中的事务一致性。
您的Iceberg表和BigQuery本地表可以通过BigLake元存储访问。这使您的表可以通过BigQuery兼容的开源引擎(包括Spark)进行访问。
Python
from pyspark.sql import SparkSession
# Create a spark session
spark = SparkSession.builder \
.appName("BigLake Metastore Iceberg") \
.config("spark.sql.catalog.CATALOG_NAME", "org.apache.iceberg.spark.SparkCatalog") \
.config("spark.sql.catalog.CATALOG_NAME.catalog-impl", "org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog") \
.config("spark.sql.catalog.CATALOG_NAME.gcp_project", "PROJECT_ID") \
.config("spark.sql.catalog.CATALOG_NAME.gcp_location", "LOCATION") \
.config("spark.sql.catalog.CATALOG_NAME.warehouse", "WAREHOUSE_DIRECTORY") \
.getOrCreate()
spark.conf.set("viewsEnabled","true")
# Use the blms_catalog
spark.sql("USE `CATALOG_NAME`;")
spark.sql("USE NAMESPACE DATASET_NAME;")
# Configure spark for temp results
spark.sql("CREATE namespace if not exists MATERIALIZATION_NAMESPACE");
spark.conf.set("materializationDataset","MATERIALIZATION_NAMESPACE")
# List the tables in the dataset
df = spark.sql("SHOW TABLES;")
df.show();
# Query the tables
sql = """SELECT * FROM DATASET_NAME.TABLE_NAME"""
df = spark.read.format("bigquery").load(sql)
df.show()
sql = """SELECT * FROM DATASET_NAME.ICEBERG_TABLE_NAME"""
df = spark.read.format("bigquery").load(sql)
df.show()
sql = """SELECT * FROM DATASET_NAME.READONLY_ICEBERG_TABLE_NAME"""
df = spark.read.format("bigquery").load(sql)
df.show()
扩展BigLake元存储功能的是Iceberg REST目录(预览中)使用任何数据处理引擎访问Iceberg数据。以下是使用Spark进行连接的方法:
Python
import google.auth
from google.auth.transport.requests import Request
from google.oauth2 import service_account
import pyspark
from pyspark.context import SparkContext
from pyspark.sql import SparkSession
catalog = ""
spark = SparkSession.builder.appName("") \
.config("spark.sql.defaultCatalog", catalog) \
.config(f"spark.sql.catalog.{catalog}", "org.apache.iceberg.spark.SparkCatalog") \
.config(f"spark.sql.catalog.{catalog}.type", "rest") \
.config(f"spark.sql.catalog.{catalog}.uri",
"https://biglake.googleapis.com/iceberg/v1beta/restcatalog") \
.config(f"spark.sql.catalog.{catalog}.warehouse", "gs://") \
.config(f"spark.sql.catalog.{catalog}.token", "") \
.config(f"spark.sql.catalog.{catalog}.oauth2-server-uri", "https://oauth2.googleapis.com/token") \ .config(f"spark.sql.catalog.{catalog}.header.x-goog-user-project", "") \ .config("spark.sql.extensions","org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") \
.config(f"spark.sql.catalog.{catalog}.io-impl","org.apache.iceberg.hadoop.HadoopFileIO") \ .config(f"spark.sql.catalog.{catalog}.rest-metrics-reporting-enabled", "false") \
.getOrCreate()
完整的数据湖仓
Google Cloud提供了一整套服务,补充了Apache Iceberg和Apache Spark,使您能够轻松构建、管理和扩展您的数据湖仓,同时利用您已经使用的许多开源技术:
结论
在Google Cloud上结合使用Apache Iceberg和Apache Spark为构建现代高性能数据湖仓提供了一个引人注目的解决方案。Iceberg提供了数据湖中历史上缺失的事务一致性、模式演变和性能优化,而Spark则提供了一个多功能且可扩展的引擎来处理这些大型数据集。