Skip to content

window(timeColumn, windowDuration, slideDuration, startTime)

Описание

Функция window() создает временное окно для агрегации данных по времени. Используется для обработки потоковых данных и временных рядов.

Параметры

  • timeColumn: Column - столбец с временными метками
  • windowDuration: String - длительность окна (например, "5 minutes", "1 hour")
  • slideDuration: String (опционально) - длительность скольжения окна (по умолчанию равна windowDuration)
  • startTime: String (опционально) - время начала первого окна

Возвращаемое значение

Struct - структура с полями: - start: Timestamp - начало окна - end: Timestamp - конец окна

Пример использования

from pyspark.sql.functions import window, count
from pyspark.sql import SparkSession
from datetime import datetime

# Создаем SparkSession
spark = SparkSession.builder.appName("window_example").getOrCreate()

# Создаем DataFrame с временными метками
data = [
    (datetime(2023, 1, 1, 10, 0), "event1"),
    (datetime(2023, 1, 1, 10, 1), "event2"),
    (datetime(2023, 1, 1, 10, 2), "event3"),
    (datetime(2023, 1, 1, 10, 5), "event4")
]
df = spark.createDataFrame(data, ["timestamp", "event"])

# Группируем события по 5-минутным окнам
result = df.groupBy(
    window("timestamp", "5 minutes")
).agg(
    count("event").alias("event_count")
).show(truncate=False)

# Результат:
# +------------------------------------------+-----------+
# |window                                     |event_count|
# +------------------------------------------+-----------+
# |{2023-01-01 10:00:00, 2023-01-01 10:05:00}|3         |
# |{2023-01-01 10:05:00, 2023-01-01 10:10:00}|1         |
# +------------------------------------------+-----------+

Примечания

  • Временные окна не перекрываются, если slideDuration равно windowDuration
  • Для перекрывающихся окон укажите slideDuration меньше windowDuration
  • NULL значения в timeColumn игнорируются
  • Для работы с временными рядами также используйте:
  • tumbling_window() для неперекрывающихся окон
  • sliding_window() для перекрывающихся окон
  • session_window() для сессионных окон