Параллельные вычисления
Contents
Параллельные вычисления#
Параллельные вычисления позволяют кратно ускорить расчет большого объема данных, за счет одновременного выполнения заданий по расчету.
Общий алгоритм параллельных вычислений в основной сессии:
Создать пул потоков
Основной характеристикой пула потоков является кол-во возможных одновременно исполняемых задач.Запросить данные для расчетов
Разбить данные для расчета на пачки для параллельного вычисления
По каждой пачке необходимо сформировать задание на расчетНаправить задания для расчета в пул потоков
При этом задания на расчет выполняются параллельно в пуле потоков
Планирование допустимого количества параллельных задач#
Пусть S - количество доступных активных сессий базы данных
S можно приблизительно рассчитать каккол-во ядер доступных postgresql
*2Пусть С - количество ядер доступных серверу приложения
Рассчитать количество потоков как:
Наименьшее(С*2,S*2)
Примечание
Данный алгоритм дает приблизительную оценку, так как оптимальное кол-во потоков зависит от соотношения нагрузки на диски и процессоры, а так же от административных квот на оборудование
Инструменты для параллельных вычислений#
ru.bitec.app.gtk.eclipse.parallel.Parallel.withPool
Метод получения пула для параллельных вычислений.Доступен в контексте прикладной сессии(
api
,pkg
,avi
).ASQL"""select...""".withTempFileAs
Запрос с сохранением результата в файл. Данный запрос позволяет минимизировать потребление памяти и сессий базы данных во время долгих вычислений.Доступен в контексте прикладной сессии.
dialogs.withInfoForm
Отображает сообщение с индикацией расчета.Доступен в отображения
dialogs.showInfoForm
Обновляет сообщение с индикацией расчета.Доступен в отображении
Совет
Дополнительную информацию смотрите в документации методов.
Шаблон параллельных вычислений#
dialogs.withInfoForm("Подготовка данных для расчета") {
//Выполнить параллельные вычисления в 16 потоках
Parallel.withPool(16) { pool =>
//Размер пачки
val batchSize = 500
//Пачка для обработки
val batch = ArrayBuffer.empty[NLong]
//Количество обработанных записей
var executedSize = 0
//Отправка пачки на выполнение
def submitBatch(): Unit= {
val curSize = batch.size
pool.submit(batch.toSet) { implicit session =>
ids =>
//ВНИМАНИЕ:
//В процедуре вычисления не доступны данные из основной сессии
//Поэтому работа может идти только с данными переданными
//В процедуру submit, и полученными в текущие замыкание
//в данном примере с именем ids
//Передать можно только данные которые можно безопасно сериализовать
//Попытка сослаться на другие данные приведет к ошибке в момент выполнения.
//Массовая загрузка rop
for (r <- Btk_QueryPkg().largeInQuery(SomeApi(), "id", ids)) {
//Выполнение вычислений
}
}.onSuccess { r =>
//Обработка результатов выполнения идет в основном потоке
//поэтому здесь доступно использование любых переменных
//Расчет общего количества посчитанных данных
executedSize = executedSize + curSize
//Обновление диалога прогресса
dialogs.showInfoForm(s"Выполнение расчетов ${executedSize}")
}
batch.clear()
}
//В данном примере запрос с большими данными для вычисления сохраняется в файл
//Это позволяет сократить затраты по оперативной памяти и используемым
//сессиям базы данных в момент расчет
//Возможно работа с несколькими полями for(id~name<-...withTempFileAs(nLong("id")~nStr("id")) )
//Для этого необходимо сделать import anorm._
for (id <-
ASQL"""
SELECT t.id FROM SomeTable t
""".withTempFileAs(nLong("id"))
) {
//Формирование пачек на параллельное выполнение
batch += id
if (batch.size >= batchSize) {
submitBatch()
}
}
//Вычисление оставшихся данных
if (batch.nonEmpty) {
submitBatch()
}
}
}