window(timeColumn, windowDuration, slideDuration, startTime)
Описание
Функция window()
создает временное окно для агрегации данных по времени. Используется для обработки потоковых данных и временных рядов.
Параметры
timeColumn
: Column - столбец с временными меткамиwindowDuration
: String - длительность окна (например, "5 minutes", "1 hour")slideDuration
: String (опционально) - длительность скольжения окна (по умолчанию равна windowDuration)startTime
: String (опционально) - время начала первого окна
Возвращаемое значение
Struct - структура с полями: - start
: Timestamp - начало окна - end
: Timestamp - конец окна
Пример использования
from pyspark.sql.functions import window, count
from pyspark.sql import SparkSession
from datetime import datetime
# Создаем SparkSession
spark = SparkSession.builder.appName("window_example").getOrCreate()
# Создаем DataFrame с временными метками
data = [
(datetime(2023, 1, 1, 10, 0), "event1"),
(datetime(2023, 1, 1, 10, 1), "event2"),
(datetime(2023, 1, 1, 10, 2), "event3"),
(datetime(2023, 1, 1, 10, 5), "event4")
]
df = spark.createDataFrame(data, ["timestamp", "event"])
# Группируем события по 5-минутным окнам
result = df.groupBy(
window("timestamp", "5 minutes")
).agg(
count("event").alias("event_count")
).show(truncate=False)
# Результат:
# +------------------------------------------+-----------+
# |window |event_count|
# +------------------------------------------+-----------+
# |{2023-01-01 10:00:00, 2023-01-01 10:05:00}|3 |
# |{2023-01-01 10:05:00, 2023-01-01 10:10:00}|1 |
# +------------------------------------------+-----------+
Примечания
- Временные окна не перекрываются, если
slideDuration
равноwindowDuration
- Для перекрывающихся окон укажите
slideDuration
меньшеwindowDuration
- NULL значения в
timeColumn
игнорируются - Для работы с временными рядами также используйте:
tumbling_window()
для неперекрывающихся оконsliding_window()
для перекрывающихся оконsession_window()
для сессионных окон