# Параллельные вычисления Параллельные вычисления позволяют кратно ускорить расчет большого объема данных, за счет одновременного выполнения заданий по расчету. Общий алгоритм параллельных вычислений в основной сессии: 1. Создать пул потоков \ Основной характеристикой пула потоков является кол-во возможных одновременно исполняемых задач. 2. Запросить данные для расчетов 3. Разбить данные для расчета на пачки для параллельного вычисления \ По каждой пачке необходимо сформировать задание на расчет 4. Направить задания для расчета в пул потоков \ При этом задания на расчет выполняются параллельно в пуле потоков ## Планирование допустимого количества параллельных задач 1. Пусть S - количество доступных активных сессий базы данных \ S можно приблизительно рассчитать как `кол-во ядер доступных postgresql`*2 2. Пусть С - количество ядер доступных серверу приложения 3. Рассчитать количество потоков как: ``` Наименьшее(С*2,S*2) ``` ```{note} Данный алгоритм дает приблизительную оценку, так как оптимальное кол-во потоков зависит от соотношения нагрузки на диски и процессоры, а так же от административных квот на оборудование ``` ## Инструменты для параллельных вычислений - `ru.bitec.app.gtk.eclipse.parallel.Parallel.withPool` \ Метод получения пула для параллельных вычислений. Доступен в контексте прикладной сессии(`api`,`pkg`,`avi`). - `ASQL"""select...""".withTempFileAs` \ Запрос с сохранением результата в файл. Данный запрос позволяет минимизировать потребление памяти и сессий базы данных во время долгих вычислений. Доступен в контексте прикладной сессии. - `dialogs.withInfoForm` \ Отображает сообщение с индикацией расчета. Доступен в отображения - `dialogs.showInfoForm` \ Обновляет сообщение с индикацией расчета. Доступен в отображении ```{tip} Дополнительную информацию смотрите в документации методов. ``` ## Шаблон параллельных вычислений ```scala dialogs.withInfoForm("Подготовка данных для расчета") { //Выполнить параллельные вычисления в 16 потоках Parallel.withPool(16) { pool => //Размер пачки val batchSize = 500 //Пачка для обработки val batch = ArrayBuffer.empty[NLong] //Количество обработанных записей var executedSize = 0 //Отправка пачки на выполнение def submitBatch(): Unit= { val curSize = batch.size pool.submit(batch.toSet) { implicit session => ids => //ВНИМАНИЕ: //В процедуре вычисления не доступны данные из основной сессии //Поэтому работа может идти только с данными переданными //В процедуру submit, и полученными в текущие замыкание //в данном примере с именем ids //Передать можно только данные которые можно безопасно сериализовать //Попытка сослаться на другие данные приведет к ошибке в момент выполнения. //Массовая загрузка rop for (r <- Btk_QueryPkg().largeInQuery(SomeApi(), "id", ids)) { //Выполнение вычислений } }.onSuccess { r => //Обработка результатов выполнения идет в основном потоке //поэтому здесь доступно использование любых переменных //Расчет общего количества посчитанных данных executedSize = executedSize + curSize //Обновление диалога прогресса dialogs.showInfoForm(s"Выполнение расчетов ${executedSize}") } batch.clear() } //В данном примере запрос с большими данными для вычисления сохраняется в файл //Это позволяет сократить затраты по оперативной памяти и используемым //сессиям базы данных в момент расчет //Возможно работа с несколькими полями for(id~name<-...withTempFileAs(nLong("id")~nStr("id")) ) //Для этого необходимо сделать import anorm._ for (id <- ASQL""" SELECT t.id FROM SomeTable t """.withTempFileAs(nLong("id")) ) { //Формирование пачек на параллельное выполнение batch += id if (batch.size >= batchSize) { submitBatch() } } //Вычисление оставшихся данных if (batch.nonEmpty) { submitBatch() } } } ```