Перейти к основному содержимому

Использование InfluxDB с Java

· 6 мин. чтения

1. Обзор

InfluxDB — это высокопроизводительное хранилище данных временных рядов. Он поддерживает вставку и запрос данных в реальном времени с помощью языка запросов, подобного SQL.

В этой вводной статье мы покажем, как подключиться к серверу InfluxDb, создать базу данных, записать информацию о временных рядах, а затем выполнить запрос к базе данных.

2. Настройка

Чтобы подключиться к базе данных, нам нужно добавить запись в наш файл pom.xml :

<dependency>
<groupId>org.influxdb</groupId>
<artifactId>influxdb-java</artifactId>
<version>2.8</version>
</dependency>

Последнюю версию этой зависимости можно найти на Maven Central .

Нам также понадобится экземпляр InfluxDB. Инструкции по загрузке и установке базы данных можно найти на веб-сайте InfluxData .

3. Подключение к серверу

3.1. Создание соединения

Для создания соединения с базой данных необходимо передать строку URL и учетные данные пользователя в фабрику соединений:

InfluxDB influxDB = InfluxDBFactory.connect(databaseURL, userName, password);

3.2. Проверка соединения

Связь с базой данных осуществляется через RESTful API , поэтому они не являются постоянными.

API предлагает специальную службу «ping», чтобы подтвердить работоспособность соединения. Если соединение хорошее, ответ содержит версию базы данных. Если нет, он содержит «неизвестно».

Итак, после создания соединения мы можем проверить его, выполнив:

Pong response = this.influxDB.ping();
if (response.getVersion().equalsIgnoreCase("unknown")) {
log.error("Error pinging server.");
return;
}

3.3. Создание базы данных

Создание базы данных InfluxDB аналогично созданию базы данных на большинстве платформ. Но нам нужно создать хотя бы одну политику хранения перед ее использованием.

Политика хранения сообщает базе данных, как долго должна храниться часть данных. Временные ряды, такие как статистика ЦП или памяти, имеют тенденцию накапливаться в больших наборах данных.

Типичной стратегией управления размером баз данных временных рядов является субдискретизация . «Необработанные» данные сохраняются с высокой скоростью, обобщаются, а затем через короткое время удаляются.

Политики хранения упрощают это, связывая часть данных со сроком действия. У InfluxData есть подробное объяснение на их сайте.

После создания базы данных мы добавим одну политику с именем defaultPolicy. Он просто будет хранить данные в течение 30 дней:

influxDB.createDatabase("foreach");
influxDB.createRetentionPolicy(
"defaultPolicy", "foreach", "30d", 1, true);

Чтобы создать политику хранения, нам потребуются имя, база данных, интервал , коэффициент репликации (который должен быть равен 1 для базы данных с одним экземпляром) и логическое значение , указывающее, что это политика по умолчанию.

3.4. Настройка уровня ведения журнала

Внутри InfluxDB API использует Retrofit и предоставляет интерфейс для средства ведения журнала Retrofit через перехватчик ведения журнала.

Итак, мы можем установить уровень ведения журнала, используя:

influxDB.setLogLevel(InfluxDB.LogLevel.BASIC);

И теперь мы можем видеть сообщения, когда мы открываем соединение и пингуем его:

Dec 20, 2017 5:38:10 PM okhttp3.internal.platform.Platform log
INFO: --> GET http://127.0.0.1:8086/ping

Доступные уровни: BASIC , FULL , HEADERS и NONE.

4. Добавление и получение данных

4.1. Точки

Итак, теперь мы готовы начать вставку и извлечение данных.

Базовой единицей информации в InfluxDB является точка, которая по сути представляет собой метку времени и карту «ключ-значение».

Давайте посмотрим на точку, содержащую данные об использовании памяти:

Point point = Point.measurement("memory")
.time(System.currentTimeMillis(), TimeUnit.MILLISECONDS)
.addField("name", "server1")
.addField("free", 4743656L)
.addField("used", 1015096L)
.addField("buffer", 1010467L)
.build();

Мы создали запись, содержащую три значения Long в качестве статистики памяти, имя хоста и отметку времени.

Давайте посмотрим, как добавить это в базу данных.

4.2. Пакетная запись

Данные временных рядов, как правило, состоят из множества мелких точек, и запись этих записей по одной за раз была бы очень неэффективной. Предпочтительным методом является сбор записей в пакеты.

API InfluxDB предоставляет объект BatchPoint :

BatchPoints batchPoints = BatchPoints
.database(dbName)
.retentionPolicy("defaultPolicy")
.build();

Point point1 = Point.measurement("memory")
.time(System.currentTimeMillis(), TimeUnit.MILLISECONDS)
.addField("name", "server1")
.addField("free", 4743656L)
.addField("used", 1015096L)
.addField("buffer", 1010467L)
.build();

Point point2 = Point.measurement("memory")
.time(System.currentTimeMillis() - 100, TimeUnit.MILLISECONDS)
.addField("name", "server1")
.addField("free", 4743696L)
.addField("used", 1016096L)
.addField("buffer", 1008467L)
.build();

batchPoints.point(point1);
batchPoints.point(point2);
influxDB.write(batchPoints);

Мы создаем BatchPoint , а затем добавляем к нему Points . Мы устанавливаем метку времени для нашей второй записи на 100 миллисекунд в прошлом, поскольку метки времени являются первичным индексом. Если мы отправим две точки с одной и той же меткой времени, останется только одна.

Обратите внимание, что мы должны связать BatchPoints с базой данных и политикой хранения.

4.3. Написание по одному

Пакетная обработка может быть непрактичной для некоторых случаев использования.

Давайте включим пакетный режим одним вызовом соединения InfluxDB:

influxDB.enableBatch(100, 200, TimeUnit.MILLISECONDS);

Мы включили пакетную обработку 100 для вставки на сервер или отправки того, что у него есть, каждые 200 миллисекунд.

При включенном пакетном режиме мы все еще можем писать по одному. Однако требуется дополнительная настройка:

influxDB.setRetentionPolicy("defaultPolicy");
influxDB.setDatabase(dbName);

Более того, теперь мы можем писать отдельные баллы, и они собираются пакетами фоновым потоком:

influxDB.write(point);

Прежде чем мы поставим в очередь отдельные точки, нам нужно установить базу данных (аналогично команде use в SQL) и установить политику хранения по умолчанию. Поэтому, если мы хотим воспользоваться преимуществом понижения частоты дискретизации с несколькими политиками хранения, создание пакетов — это то, что нужно.

Пакетный режим использует отдельный пул потоков. Поэтому рекомендуется отключить его, когда он больше не нужен:

influxDB.disableBatch();

Закрытие соединения также закроет пул потоков:

influxDB.close();

4.4. Сопоставление результатов запроса

Запросы возвращают QueryResult , который мы можем сопоставить с POJO.

Прежде чем мы рассмотрим синтаксис запроса, давайте создадим класс для хранения нашей статистики памяти:

@Measurement(name = "memory")
public class MemoryPoint {

@Column(name = "time")
private Instant time;

@Column(name = "name")
private String name;

@Column(name = "free")
private Long free;

@Column(name = "used")
private Long used;

@Column(name = "buffer")
private Long buffer;
}

Класс снабжен аннотацией @Measurement(name = «memory») , соответствующей Point.measurement(«memory») , которую мы использовали для создания наших Points .

Для каждого поля в нашем QueryResult мы добавляем аннотацию @Column(name = «XXX») с именем соответствующего поля.

QueryResults сопоставляются с POJO с помощью InfluxDBResultMapper.

4.5. Запрос InfluxDB

Итак, давайте используем наш POJO с точками, которые мы добавили в базу данных в нашем пакете из двух точек:

QueryResult queryResult = connection
.performQuery("Select * from memory", "foreach");

InfluxDBResultMapper resultMapper = new InfluxDBResultMapper();
List<MemoryPoint> memoryPointList = resultMapper
.toPOJO(queryResult, MemoryPoint.class);

assertEquals(2, memoryPointList.size());
assertTrue(4743696L == memoryPointList.get(0).getFree());

Запрос показывает, как наше измерение с именем memory хранится в виде таблицы точек , из которой мы можем выбирать .

InfluxDBResultMapper принимает ссылку на MemoryPoint.class с QueryResult и возвращает список точек.

После сопоставления результатов мы убеждаемся, что получили два, проверяя длину списка , полученного из запроса. Затем мы смотрим на первую запись в списке и видим размер свободной памяти второй точки, которую мы вставили. По умолчанию результаты запросов из InfluxDB упорядочиваются по возрастанию по отметке времени.

Давайте изменим это:

queryResult = connection.performQuery(
"Select * from memory order by time desc", "foreach");
memoryPointList = resultMapper
.toPOJO(queryResult, MemoryPoint.class);

assertEquals(2, memoryPointList.size());
assertTrue(4743656L == memoryPointList.get(0).getFree());

Добавление порядка по времени описания меняет порядок результатов.

Запросы InfluxDB очень похожи на SQL. На их сайте есть обширный справочник .

5. Вывод

Мы подключились к серверу InfluxDB, создали базу данных с политикой хранения, а затем вставили и получили данные с сервера.

Полный исходный код примеров закончился на GitHub .