Tamkovich.com: Телеком/VoIP блог
Современные технологии: Asterisk, SIP, Kamailio, Linux, Cisco, Linksys
Kamailio: загружаем много данных с помощью db_mysql
20 октября, 2010 by Сергей Тамкович
ITSP, Программирование, Сделай сам Kamailio, MySQLВ Kamailio/OpenSIPS, как в любом взрослом проекте, имеется некоторое количество «прокладочных» интерфейсов. Задача этих интерфейсов — унификация. Например, с помощью mem/mem.h унифицирована работа с памятью. Интерфейс предоставляет набор вызовов pkg_malloc/pkg_free для работы с обычной памятью и shm_malloc/shm_free для работы с разделяемой памятью. Благодаря унификации работы с памятью — Kamailio портирован на множество различных платформ. Другим интерфейсом-прокладкой является интерфейс работы с базами данных DB API. Базовая работа с этим интерфейсом хорошо описана в KAMAILIO (OPENSER) Devel Guide.
Классическая операция получения данных из MySQL выглядит следующим образом:
snprintf(sqlcmd, sizeof(sqlcmd) - 1, "select n.id,n.name,n.ip,n.port,n.capacity,COUNT(c.node) from cluster_nodes n left join active_calls c on (n.name=c.node) where online=1 group by n.name"); sql_cmd.s = sqlcmd; sql_cmd.len = strlen(sqlcmd); LOG(L_DBG, "Trying to execute SQL: '%s'\n", sqlcmd); fres = dbf.raw_query(db_handle, &sql_cmd, &res); if (fres == 0) { total = RES_ROW_N(res); if (total > 0) { for(i = 0; i < total; i++) { node = shm_malloc(sizeof(struct ci_li_node)); if (node == NULL) { LM_ERR("Could not allocate shared memory\n"); return -1; } node->id = RES_ROWS(res)[i].values[0].val.int_val; node->calls_init = RES_ROWS(res)[i].values[5].val.int_val; node->calls_current = node->calls_init; node->calls_max = RES_ROWS(res)[i].values[4].val.int_val; node->port = RES_ROWS(res)[i].values[3].val.int_val; snprintf(node->name, sizeof(node->name), "%s", (char*)RES_ROWS(res)[i].values[1].val.str_val.s); snprintf(node->ip, sizeof(node->ip), "%s", (char*)RES_ROWS(res)[i].values[2].val.str_val.s); node->next = NULL; total_max += node->calls_max; if (mylist->last == NULL) { mylist->last = node; mylist->first = node; } else { mylist->last->next = node; mylist->last = node; } } LOG(L_DBG, "PRELOAD TOTAL: [servers] %d items = %lu bytes.\n", i, i * sizeof(struct ci_li_node)); mylist->dt = time(NULL); } dbf.free_result(db_handle, res); } |
Как видим из простого примера, получение данных из MySQL сводится к трём действиям:
- Отправляем SQL запрос с помощью команды raw_query.
- Читаем результаты с помощью макросов RES_ROW_N и RES_ROWS
- Подчищаем за собой память с помощью free_result
Пока мы действуем по шаблону — всё отлично работает.
Но стоит нам захотеть чего то большего — и подход придётся менять. Например приведенным выше кодом нельзя загрузить за раз большую таблицу (тысячи записей). Прокладочные интерфейсы БД и памяти — накладываются друг на друга и возникают интересные эффекты. Дело в том, что при попытке загрузить большую таблицу целиком — db_mysql выделяет память под неё — одним куском. С другой стороны, pkg_free не освобождает память моментально, а лишь помечает её как готовую к очистке. Всё это приводит к тому, что загрузка большой таблицы срабатывает 2 раза подряд, а на третий возникает ошибка — недостаточно памяти.
Зачем загружать всю таблицу целиком — спросите вы? Вариантов может быть очень много — самый распространённый, пожалуй, — для того, что бы закешировать данные из БД. На одной СУБД далеко не уедешь, даже на такой быстрой как MySQL. Итак, что же нужно сделать для того, что бы обойти проблемы с памятью? Ответ очень простой: нужно воспользоваться вызовом fetch_result. К сожалению, применение этого вызова не расписано в KAMAILIO (OPENSER) Devel Guide, но, как известно, лучшая документация это исходный код. С практикой применения этого вызова можно ознакомиться в модулях Kamailio:
[root@sipproxy kamailio-3.0.3]# grep -l fetch_result */*/*.c ... modules_k/dialog/dlg_db_handler.c modules_k/drouting/dr_load.c modules_k/htable/ht_db.c modules_k/pdt/pdt.c ...
Вызов fetch_result позволяет указать сколько строк мы хотим поместить в память. Таким образом, вызывая fetch_result в цикле — мы обработаем все строки возвращённые базой данных по порциям. Если вы планируете использовать fetch_result самостоятельно, необходимо передать NULL в качестве указателя db_res_t** _r у функций raw_query(…) и query(…). Для наглядности, рассмотрим похожий кусок кода, написанный с использованием вызова fetch_result. Параметр ci_preload_chunk задаёт количество строк загружаемых в память за 1 вызов:
snprintf(sqlcmd, sizeof(sqlcmd) - 1, "select name,owner from sip where host='dynamic' and owner>0" ); sql_cmd.s = sqlcmd; sql_cmd.len = strlen(sqlcmd); LOG(L_DBG, "Trying to execute SQL: '%s'\n", sqlcmd); fres = dbf.raw_query(db_handle, &sql_cmd, NULL); if (fres == 0) { LOG(L_DBG, "Trying to fetch first %d items.\n", ci_preload_chunk); if (dbf.fetch_result(db_handle, &res, ci_preload_chunk) < 0) { LM_ERR("Error while fetching result\n"); if (res) { dbf.free_result(db_handle, res); } return 0; } total = RES_ROW_N(res); LOG(L_DBG, "%d first items fetched.\n", total); if (total > 0) { do { for(i = 0; i < total; i++) { row = RES_ROWS(res) + i; u = shm_malloc(sizeof(struct ci_li_sipuser)); if (u == NULL) { LM_ERR("Could not allocate shared memory\n"); return -1; } u->sipaccount = atoi((char*)VAL_STRING(ROW_VALUES(row))); u->customer = VAL_INT(ROW_VALUES(row) + 1); u->next = NULL; if (mylist->last == NULL) { mylist->last = u; mylist->first = u; } else { mylist->last->next = u; mylist->last = u; } mylist->elements++; cnt++; } if (dbf.fetch_result(db_handle, &res, ci_preload_chunk) < 0) { LM_ERR("Error while fetching result\n"); if (res) { dbf.free_result(db_handle, res); } return 0; } total = RES_ROW_N(res); } while (total > 0); dbf.free_result(db_handle, res); } } |
ITSP, Программирование, Сделай сам Kamailio, MySQL