Параллельные вычисления#

Параллельные вычисления позволяют кратно ускорить расчет большого объема данных, за счет одновременного выполнения заданий по расчету.

Общий алгоритм параллельных вычислений в основной сессии:

  1. Создать пул потоков - основной характеристикой пула потоков является кол-во возможных одновременно исполняемых задач.

  2. Запросить данные для расчетов.

  3. Разбить данные для расчета на пачки для параллельного вычисления - по каждой пачке необходимо сформировать задание на расчет.

  4. Направить задания для расчета в пул потоков - при этом задания на расчет выполняются параллельно в пуле потоков.

Планирование допустимого количества параллельных задач#

  1. Пусть S - количество доступных активных сессий базы данных
    S можно приблизительно рассчитать как кол-во ядер доступных postgresql*2.

  2. Пусть С - количество ядер доступных серверу приложения.

  3. Рассчитать количество потоков как:

     Наименьшее(С*2,S*2).
    

Примечание

Данный алгоритм дает приблизительную оценку, так как оптимальное кол-во отоков зависит от соотношения нагрузки на диски и процессоры, а так же от административных квот на оборудование.

Инструменты для параллельных вычислений#

  • Метод получения пула для параллельных вычислений - ru.bitec.app.gtk.eclipse.parallel.Parallel.withPool
    Доступен в контексте прикладной сессии (api,pkg,avi).

  • Запрос с сохранением результата в файл - ASQL"""select...""".withTempFileAs. Данный запрос позволяет минимизировать потребление памяти и сессий базы данных во время долгих вычислений.

    Доступен в контексте прикладной сессии.

  • Отображает сообщение с индикацией расчета - dialogs.withInfoForm . Доступен в отображения.

  • Обновляет сообщение с индикацией расчета - dialogs.showInfoForm.
    Доступен в отображении.

Совет

Дополнительную информацию смотрите в документации методов.

Шаблон параллельных вычислений#

dialogs.withInfoForm("Подготовка данных для расчета") {
    //Выполнить параллельные вычисления в 16 потоках
    Parallel.withPool(16) { pool =>
        //Размер пачки
        val batchSize = 500
        //Пачка для обработки
        val batch = ArrayBuffer.empty[NLong]
        //Количество обработанных записей
        var executedSize = 0

        //Отправка пачки на выполнение
        def submitBatch(): Unit= {
            val curSize = batch.size
            pool.submit(batch.toSet) { implicit session =>
                ids =>
                //ВНИМАНИЕ:
                //В процедуре вычисления не доступны данные из основной сессии
                //Поэтому работа может идти только с данными переданными
                //В процедуру submit, и полученными в текущие замыкание
                //в данном примере с именем ids
                //Передать можно только данные которые можно безопасно сериализовать
                //Попытка сослаться на другие данные приведет к ошибке в момент выполнения.

                //Массовая загрузка rop
                for (r <- Btk_QueryPkg().largeInQuery(SomeApi(), "id", ids)) {
                    //Выполнение вычислений
                }
            }.onSuccess { r =>
                //Обработка результатов выполнения идет в основном потоке
                //поэтому здесь доступно использование любых переменных
                
                //Расчет общего количества посчитанных данных
                executedSize = executedSize + curSize
                //Обновление диалога прогресса
                dialogs.showInfoForm(s"Выполнение расчетов ${executedSize}")
            }
            batch.clear()
        }
        
        //В данном примере запрос с большими данными для вычисления сохраняется в файл
        //Это позволяет сократить затраты по оперативной памяти и используемым
        //сессиям базы данных в момент расчет
        //Возможно работа с несколькими полями for(id~name<-...withTempFileAs(nLong("id")~nStr("id")) )
        //Для этого необходимо сделать import anorm._
        for (id <-
                ASQL"""
                SELECT t.id  FROM SomeTable t
                """.withTempFileAs(nLong("id"))
        ) {
            //Формирование пачек на параллельное выполнение
            batch += id
            if (batch.size >= batchSize) {
                submitBatch()
            }
        }
        //Вычисление оставшихся данных
        if (batch.nonEmpty) {
            submitBatch()
        }
    }
}