Параллельные вычисления#

Параллельные вычисления позволяют кратно ускорить расчет большого объема данных, за счет одновременного выполнения заданий по расчету.

Общий алгоритм параллельных вычислений в основной сессии:

  1. Создать пул потоков
    Основной характеристикой пула потоков является кол-во возможных одновременно исполняемых задач.

  2. Запросить данные для расчетов

  3. Разбить данные для расчета на пачки для параллельного вычисления
    По каждой пачке необходимо сформировать задание на расчет

  4. Направить задания для расчета в пул потоков
    При этом задания на расчет выполняются параллельно в пуле потоков

Планирование допустимого количества параллельных задач#

  1. Пусть S - количество доступных активных сессий базы данных
    S можно приблизительно рассчитать как кол-во ядер доступных postgresql*2

  2. Пусть С - количество ядер доступных серверу приложения

  3. Рассчитать количество потоков как:

     Наименьшее(С*2,S*2)
    

Примечание

Данный алгоритм дает приблизительную оценку, так как оптимальное кол-во потоков зависит от соотношения нагрузки на диски и процессоры, а так же от административных квот на оборудование

Инструменты для параллельных вычислений#

  • ru.bitec.app.gtk.eclipse.parallel.Parallel.withPool
    Метод получения пула для параллельных вычислений.

    Доступен в контексте прикладной сессии(api,pkg,avi).

  • ASQL"""select...""".withTempFileAs
    Запрос с сохранением результата в файл. Данный запрос позволяет минимизировать потребление памяти и сессий базы данных во время долгих вычислений.

    Доступен в контексте прикладной сессии.

  • dialogs.withInfoForm
    Отображает сообщение с индикацией расчета.

    Доступен в отображения

  • dialogs.showInfoForm
    Обновляет сообщение с индикацией расчета.

    Доступен в отображении

Совет

Дополнительную информацию смотрите в документации методов.

Шаблон параллельных вычислений#

dialogs.withInfoForm("Подготовка данных для расчета") {
    //Выполнить параллельные вычисления в 16 потоках
    Parallel.withPool(16) { pool =>
        //Размер пачки
        val batchSize = 500
        //Пачка для обработки
        val batch = ArrayBuffer.empty[NLong]
        //Количество обработанных записей
        var executedSize = 0

        //Отправка пачки на выполнение
        def submitBatch(): Unit= {
            val curSize = batch.size
            pool.submit(batch.toSet) { implicit session =>
                ids =>
                //ВНИМАНИЕ:
                //В процедуре вычисления не доступны данные из основной сессии
                //Поэтому работа может идти только с данными переданными
                //В процедуру submit, и полученными в текущие замыкание
                //в данном примере с именем ids
                //Передать можно только данные которые можно безопасно сериализовать
                //Попытка сослаться на другие данные приведет к ошибке в момент выполнения.

                //Массовая загрузка rop
                for (r <- Btk_QueryPkg().largeInQuery(SomeApi(), "id", ids)) {
                    //Выполнение вычислений
                }
            }.onSuccess { r =>
                //Обработка результатов выполнения идет в основном потоке
                //поэтому здесь доступно использование любых переменных
                
                //Расчет общего количества посчитанных данных
                executedSize = executedSize + curSize
                //Обновление диалога прогресса
                dialogs.showInfoForm(s"Выполнение расчетов ${executedSize}")
            }
            batch.clear()
        }
        
        //В данном примере запрос с большими данными для вычисления сохраняется в файл
        //Это позволяет сократить затраты по оперативной памяти и используемым
        //сессиям базы данных в момент расчет
        //Возможно работа с несколькими полями for(id~name<-...withTempFileAs(nLong("id")~nStr("id")) )
        //Для этого необходимо сделать import anorm._
        for (id <-
                ASQL"""
                SELECT t.id  FROM SomeTable t
                """.withTempFileAs(nLong("id"))
        ) {
            //Формирование пачек на параллельное выполнение
            batch += id
            if (batch.size >= batchSize) {
                submitBatch()
            }
        }
        //Вычисление оставшихся данных
        if (batch.nonEmpty) {
            submitBatch()
        }
    }
}