PipelineDB: работа с потоками данных

PR-2004-7

В предыдущих публикациях мы уже затрагивали проблему обработки событий в реальном масштабе времени. Сегодня мы хотели бы вновь вернутся к этой теме и рассказать о новом и весьма интересном инструменте — потоковой СУБД PipelineDB.

PipelineDB основана на кодовой базе PostgreSQL 9.4 и полностью с ней совместима. Её первый релиз состоялся в июле 2015 года, а в январе 2016 вышла в свет enterprise-версия.

Ниже мы сравним PipelineDB с существующими решениями аналогичного плана, приведём краткую инструкцию по установке и первичной настройке, а также разберём практический пример.

Обработка данных в реальном времени: экскурс в историю

Принцип работы PipelineDB можно сформулировать так: «постоянные запросы, кратковременные данные». В реляционных СУБД всё обстоит ровно наоборот: «кратковременные запросы, постоянные данные. В PipelineDB данные не хранятся, а поступают в потоке; их обработка происходит «на лету», в движении.

Первые попытки создания инструментов для обработки данных в движении восходят к концу 1980-х годов, когда появились так называемые активные базы данных (Active Database Systems). Они представляли собой расширения к существующим СУБД для обработки событий при помощи триггеров и правил. В качестве примера решений подобного плана можно назвать HiPAC, Starburst или ODE.
Широкого распространения они, однако, не получили: сфера их применения была достаточно узкой, а синтаксис правил — слишком сложным и запутанным.

В 1990-х — начале 2000-х появились системы управления потоками данных (Data Stream Management Systems): TelegraphCQ (форк PostgreSQL), StreamBase, StreamSQL. Принцип работы этих инструментов заключался в следующем: при помощи так называемых оконных операторов (window operators) потоки преобразовывались в таблицы, по отношению к которому затем можно было применять SQL-запросы.
Появление таких решений было несомненным шагом вперёд, но они не могли обеспечить высокую скорость и производительность при работе с большими потоками данных.

Инструменты, ориентированные на обработку данных без хранения, получили распространение в течение последних 5 — 6 лет. Из самых известных примеров следует выделить, в частности, Storm и Heron. Из появившихся относительно недавно — Apache Calcite. Все эти решения характеризуются сложностью установки и настройки, а также очень высоким порогом вхождения.

Преимущества PipelineDB перед упомянутыми выше инструментами очевидны:

  • простота настройки: чтобы начать работу, достаточно просто скачать и установить необходимые пакеты;
  • простота освоения (следствие совместимости с PostgreSQL).

Рассмотрим, как в PipelineDB строится работа с потоками данных. Начнём с анализа двух важнейших понятий: «непрерывное представление» и «поток».

Потоки и непрерывные представления

«Поток» и «непрерывное представление» — главные абстракции PipelineDB.
Поток — это последовательность событий. Запись событий в поток осуществляется точно так же, как запись в таблицы в реляционных ДБ (подробнее об этом см. здесь). Когда событие поступает в поток, к нему добавляется временная метка (timestamp).

Потоки в PipelineDB выполняют вспомогательную функцию, которая заключается в поставке данных для непрерывных представлений. В отличии от таблиц, для потоков не нужно создавать схемы. В поток можно записывать данные, пока он взаимодейстсвует хотя бы с одним непрерывным представлением.

Непрерывное представление (англ. continuous view) — это выборка из потоков и таблиц, обновляемая по мере поступления новых данных. В непрерывные представления попадают события, отбираемые по определённым параметрам.

Чтобы лучше понять, как работает PipelineDB, приведём несколько примеров непрерывных представлений.

Вот так, например, можно создать непрерывное представление для ежедневного подсчёта числа уникальных посетителей, приходящих на сайт по внешним ссылкам:

CREATE CONTINUOUS VIEW uniques AS
SELECT date_trunc('day', arrival_timestamp) AS day,
  referrer::text, COUNT(DISTINCT user_id::integer)
FROM users_stream GROUP BY day, referrer;

Ещё один пример — подсчёт числа показов рекламы на сайте за последние 5 минут:

CREATE CONTINUOUS VIEW imps AS
SELECT COUNT(*) FROM imps_stream
WHERE (arrival_timestamp > clock_timestamp() - interval '5 minutes');

Как видим, непрерывные представления имеют следующую форму:

CREATE CONTINUOUS VIEW name AS query

При создании непрерывного представления по отношению к потокам выполняется операция SELECT, с помощью которой отбираются данные, соответствующие требуемым параметрам.

Основные теоретические сведения, необходимые для понимания принципов работы PipelineDB, мы изложили. Переходим к практической части. Сначала мы опишем процедуру установки и первичной настройки PipelineDB, а затем перейдём к практическим примерам.

Установка и первичная настройка

Процедуру установки PipelineDB мы будем описывать на материале OC Ubuntu 14.04. Если вы используете другой дистрибутив Linux, обратитесь к официальной документации.

Чтобы установить PipelineDB, достаточно выполнить две команды:

$ wget https://www.pipelinedb.com/download/0.9.3/ubuntu14
$ sudo dpkg -i ubuntu14

После этого инициализируем сервер PipelineDB:

$ pipeline-init -D [имя директории]

В опции -D можно указывается имя новой директории, которая будет создана автоматически. Вот список содержимого этой директории:

base          pg_hba.conf    pg_replslot   pg_subtrans  pipelinedb.auto.conf
global        pg_ident.conf  pg_serial     pg_tblspc    pipelinedb.conf
pg_clog       pg_logical     pg_snapshots  pg_twophase  postmaster.opts
pg_commit_ts  pg_multixact   pg_stat       PG_VERSION   postmaster.pid
pg_dynshmem   pg_notify      pg_stat_tmp   pg_xlog

Основные настройки PipelineDB хранятся в файле pipelinedb.conf. Они почти не отличаются от соответствующих настроек PostgreSQL.

По умолчанию PipelineDB не может принимать соединения с удалённых хостов. Чтобы изменить эту настройку, откроем файл pipelinedb.conf, найдём в нём раздел Connections and Authentication, расскомментируем первую строку и отредактируем её следующим образом:

listen_addresses = '*'

После этого пропишем конкретные хосты в файле pg_hba.conf:

host    all             all             <IP-адрес>/<подсеть>            md5

Если нам нужно принимать соединения со всех возможных хостов, эта строка должна выглядеть так:

host    all             all             0.0.0.0/0            md5

Вот и всё. PipelineDB готова к работе.
Чтобы запустить её в фоновом режиме, выполним следующую команду:

$ pipeline-ctl -D [имя директории] -l pipelinedb.log start

Практический пример: анализируем статистику Википедии

Мы разобрали необходимую теорию, а также описали процедуры установки и первичной настройки PipelineDB. Переходим к особенностям использования PipelineDB на практике.

Мы рассмотрим интересный прмиер, который приводится в официальной документации PipelineDB: анализ статистики обращений к страницам Википедии и смежных проектов в час (Wiktionary, Wikisources, Wikibooks и другим). Эта статистика размещена в открытом доступе. Информация о каждом обращении представлена в виде записи, состоящей из следующих полей:

время обращения | проект | число просмотров за | всего обслужено байт

Нас будут интересовать максимальное, минимальное и среднее количество обращений к странице в течение часа, а также 99-й перцентиль обращений.
Активируем выполнение непрерывных запросов:

$ psql -h localhost -p 5432 -d pipeline -c "ACTIVATE"

После этого создадим непрерывное представление:

$ psql -h localhost -p 5432 -d pipeline -c "CREATE CONTINUOUS VIEW wiki_stats AS
SELECT hour::timestamp, project::text,
           count(*) AS total_pages,
           sum(view_count::bigint) AS total_views,
           min(view_count) AS min_views,
           max(view_count) AS max_views,
           avg(view_count) AS avg_views,
           percentile_cont(0.99) WITHIN GROUP (ORDER BY view_count) AS p99_views,
           sum(size::bigint) AS total_bytes_served
    FROM wiki_stream
    GROUP BY hour, project;"

CREATE CONTINUOUS VIEW

В приведённой команде указано, что мы будем получать данные для непрерывного представления из потока wiki_stream. Чтобы создать такой поток, нам потребуется загрузить с сайта данные, разархивировать, записать в стандартный вывод, а после этого передать PipelineDB с помощью команды COPY:

$ curl -sL http://pipelinedb.com/data/wiki-pagecounts | gunzip | \
        psql -h localhost -p 5432 -d pipeline -c "
        COPY wiki_stream (hour, project, title, view_count, size) FROM STDIN"

Отметим, что объём данных очень велик (они хранятся в виде архивов по 80-90 MБ каждый), и их загрузка может занять продолжительное время. Загрузку можно остановить в любой момент нажатием стандартной комбинации клавиш Ctrl+C.

По завершении загрузки выполним команду:

$ psql -h localhost -p 5432 -d pipeline -c "
SELECT * FROM wiki_stats ORDER BY total_views DESC";

Результат будет представлен в виде таблицы (приводим лишь небольшой фрагмент):

     hour         |     project     | total_pages | total_views | min_views | max_views |       avg_views        |    p99_views     | total_bytes_served
---------------------+-----------------+-------------+-------------+-----------+-----------+------------------------+------------------+-----------------
 2015-06-01 01:00:00 | en              |     2590639 |     7640042 |         1 |    368264 |     2.9490955706294856 | 28.8354562848382 |    247434016274
 2015-06-01 00:00:00 | en              |     2556148 |     7554038 |         1 |    406121 |     2.9552428106666750 | 29.2785253533473 |    243707663997
 2015-06-01 01:00:00 | en.mw           |           1 |     5560465 |   5560465 |   5560465 |   5560465.000000000000 |          5560465 |    143619712266
 2015-06-01 00:00:00 | en.mw           |           1 |     5398933 |   5398933 |   5398933 |   5398933.000000000000 |          5398933 |    137434148764
 2015-06-01 02:00:00 | en              |      810196 |     2411275 |         1 |     18485 |     2.9761625581957946 | 28.6634029920602 |    75540476402
 2015-06-01 00:00:00 | es              |      395449 |     1705754 |         1 |     19397 |     4.3134614071599625 | 53.0791353769321 |    37829076747
 2015-06-01 01:00:00 | es              |      407703 |     1696681 |         1 |     19306 |     4.1615612345261134 | 50.7215532793044 |    37355945443
 2015-06-01 01:00:00 | es.mw           |           1 |     1205844 |   1205844 |   1205844 |   1205844.000000000000 |          1205844 |    20934926923
 2015-06-01 00:00:00 | es.mw           |           1 |     1192257 |   1192257 |   1192257 |   1192257.000000000000 |          1192257 |    20642415036
 2015-06-01 01:00:00 | ja              |      386039 |     1079228 |         1 |     27601 |     2.7956449995984862 | 23.9804451096668 |    38244974161

Заключение

PipelineDB — интересный и перспективный продукт. Надеемся, что он будет успешно развиваться и в дальнейшем.
Если у вас есть опыт использования PipelineDB на практике — будем рады, если вы поделитесь опытом в комментариях.

Для желающих узнать больше приводим несколько полезных ссылок: