Сервер TCP/IP... много серверов хороших и разных.
Чаще всего,
если это не приходится делать очень часть (т.е. не является основной
спецификой работы), при необходимости написания TCP/IP сервера
используются одна из двух <классических> технологий:
последовательный сервер, или параллельный сервер на основе fork() (Windows-программисты в этом случае пишут сервер на
основе thread). Хотя реально можно
предложить гораздо больше принципиально различных серверов, которые
будут существенно отличаться своей сложностью, временем реакции на
запрос клиента и т.д. Ниже описано несколько из таких способов с
результатами их тестирования. Программы делались и испытывались в
OS QNX 6.2.1, но могут (за исключением
специально оговоренного случая) практически без изменений
использоваться в любой UNIX-like OS, а за некоторым исключением - и в Windows.
1.
Постановка задачи: мы напишем специальный тестовый TCP/IP клиент, который
посылает требуемое число раз запрос к серверу (ретранслятору),
принимает от него ответ, и тут же разрывает соединение. Серия
запросов от клиента делается для усреднения результата и для того
(как будет видно далее), чтобы исключить (или учесть) эффекты
кэширования памяти. Клиент измеряет время (точнее - число циклов
процессора) между отправкой запроса серверу и приходом ответа от
него. Сервера в этом анализе являются простыми ретрансляторами. Все
показанные программы - предложены в упрощённых вариантах: не везде
сделана полная обработка ошибочных ситуаций (что, вообще-то говоря,
крайне необходимо), и сознательно не включена обработка
сигнала SIGCHLD, которая должна
препятствовать появлению <зомби> процессов. Все приводимые коды
программ - работающие и апробированные: весь результирующий вывод
скопирован непосредственно с консоли задачи. Весь приводимый
программный код транслировался компилятором gcc-2-95 в нотации языка C++ (хотя специфические особенности С++, за
исключением потокового ввода-вывода С++ и не
использованы).
2. Клиент. Собственно клиент
размещён в файле cli.cpp, но он, совместно с сервером, использует общие
файлы common.h &
common.cpp, все эти
файлы с краткими комментариями приведены ниже:
common.h - в этом файле
определены различные порты TCP, по которым
клиент будет связываться с различными модификациями серверов. Кроме
того, здесь определены:
-
функция завершения по критической ошибке;
-
единая процедура ретрансляции через сокет, которую используют все
сервера(для единообразия и корректности сравнений);
-
функция подготовки прослушивающего сокета
TCP/IP (для того,
чтобы устранить этот, достаточно объёмный, код из кода серверов,
рассматриваемых ниже).
#if !defined( __COMMON_H )
#define __COMMON_H
#include <stdlib.h>
#include <stdio.h>
#include <errno.h>
#include <unistd.h>
#include <string.h>
#include
<iostream.h>
#include <netdb.h>
const int PORT =
9000,
/* программа: */
SINGLE_PORT =
PORT,
/* ech0 */
FORK_PORT = PORT +
1,
/* ech1 */
FORK_LARGE_PORT = PORT + 2, /* ech10
*/
PREFORK_PORT = PORT +
3, /* ech11
*/
INET_PORT = PORT +
4,
/* ech3 */
THREAD_PORT = PORT +
5, /*
ech2 */
THREAD_POOL_PORT = PORT + 6, /* ech21
*/
PRETHREAD_PORT = PORT + 7; /*
ech22 */
const int MAXLINE = 40;
// критическая
ошибка ...
void errx( const char *msg, int err = EOK
);
//
ретранслятор тестовых пакетов TCP
void retrans( int sc );
//
создание и подготовка прослушивающего сокета
int getsocket( in_port_t
);
#endif
common.cpp - реализационная
часть:
#include "common.h"
// ошибка
...
void errx( const char *msg, int err = EOK )
{
perror( msg
);
if( err != EOK ) errno =
err;
exit( EXIT_FAILURE
);
};
//
ретранслятор тестовых пакетов TCP
static char data[ MAXLINE
];
void retrans( int sc ) {
int rc = read( sc, data,
MAXLINE );
if( rc > 0 )
{
rc = write(
sc, data, strlen( data ) + 1 );
if ( rc <
0 ) perror( "write data failed" );
}
else if( rc < 0 ) { perror(
"read data failed" ); return; }
else if( rc == 0 ) { cout
<< "client closed connection" << endl; return;
};
return;
};
//
создание и подготовка прослушивающего сокета
struct sockaddr_in addr;
int getsocket( in_port_t p )
{
int rc = 1,
ls;
if( ( ls = socket( AF_INET,
SOCK_STREAM, 0 ) ) = -1 )
errx( "create
stream socket failed" );
if( setsockopt( ls, SOL_SOCKET,
SO_REUSEADDR, &rc, sizeof( rc ) ) != 0 )
errx( "set
socket option failed" );
memset( &addr, 0, sizeof(
addr ) );
addr.sin_len = sizeof( addr
);
addr.sin_family =
AF_INET;
addr.sin_port = htons( p
);
addr.sin_addr.s_addr = htonl(
INADDR_ANY );
if( bind( ls, (struct
sockaddr*)&addr, sizeof( sockaddr ) ) != 0 )
errx( "bind
socket address failed" );
if( listen( ls, 25 ) != 0 )
errx( "put socket in listen state failed" );
return ls;
};
cli.cpp - код
клиента:
#include
<inttypes.h>
#include
<sys/neutrino.h>
#include
<sys/syspage.h>
#include
<sys/procfs.h>
#include "common.h"
//
установка параметров клиентов: порт и число повторений
static void setkey( int argc, char *argv[],
in_port_t* port, int* num ) {
int opt,
val;
while ( ( opt = getopt(
argc, argv, "p:n:") ) != -1 ) {
switch( opt ) {
case 'p' :
if( sscanf( optarg, "%i", &val ) != 1 )
errx( "parse command line failed", EINVAL );
*port = (in_port_t)val;
break;
case 'n' :
if( ( sscanf( optarg, "%i", &val ) != 1 ) || ( val <= 0 )
)
errx( "parse command line failed", EINVAL );
*num = val;
break;
default :
errx( "parse command line failed", EINVAL );
break;
};
};
};
//
клиент - источник потока тестовых пакетов TCP
int main( int argc, char *argv[] )
{
in_port_t listen_port =
SINGLE_PORT;
int num = 10;
setkey( argc, argv,
&listen_port, &num );
char data[ MAXLINE ], echo[
MAXLINE ];
uint64_t cps = cps =
SYSPAGE_ENTRY( qtime )->cycles_per_sec;
cout << "TCP port = "
<< listen_port << ", number of echoes = " << num
<< endl
<< "time of reply - Cycles [usec.] :" <<
endl;
for( int i = 0; i < num; i++
) {
int rc,
ls;
if( ( ls =
socket( AF_INET, SOCK_STREAM, 0 ) ) < 0 )
errx(
"create stream socket failed" );
struct
sockaddr_in addr;
memset(
&addr, 0, sizeof( addr ) );
addr.sin_len
= sizeof( addr );
addr.sin_family = AF_INET;
addr.sin_port
= htons( listen_port );
inet_aton(
"localhost", &addr.sin_addr );
if( ( rc =
connect( ls, (struct sockaddr*)&addr, sizeof( sockaddr ) ) )
< 0 )
errx(
"connect failed" );
sprintf(
data, "%d", rand() );
uint64_t
cycle = ClockCycles();
if( ( rc =
write( ls, data, strlen( data ) + 1 ) ) <= 0 )
errx( "write
data failed" );
rc = read(
ls, echo, MAXLINE );
cycle =
ClockCycles() - cycle;
if( rc < 0
) errx( "read data failed" );
if( rc == 0 )
errx( "server closed connection" );
if( strcmp(
data, echo ) != 0 ) { cout << "wrong data" << endl;
break; };
cout <<
cycle << "[" << cycle * 1000000 / cps <<
"]";
if( i % 5 ==
4 ) cout << endl; else cout << '\t'; cout <<
flush;
close( ls
);
delay( 100
);
};
if( num % 5 != 0 ) cout
<< endl;
exit( EXIT_SUCCESS
);
};
После запуска клиент анализирует ключи запуска. Предусмотрены
значения: <-p> значение порта подключения
(по умолчанию - последовательный сервер, порт 9000), и <-n> - число запросов к серверу в серии (по
умолчанию - 10). Каждый запрос представляет собой случайное число,
генерируемое клиентом, в символьной форме. Ретранслированный
сервером ответ сверяется с запросом для дополнительного контроля.
Клиент подключается к серверу по петлевому интерфейсу 127.0.0.1, что
вполне достаточно для сравнительного анализа. Далее мы рассмотрим
его работу с различными серверами.
3.
Последовательный сервер ретранслятор. Такой сервер нас
интересует только как эталон для сравнения: он имеет минимальное
время реакции, т.к. не затрачивается время на порождение каких-либо
механизмов параллелизма. С другой стороны, такой сервер, зачастую,
просто неинтересен, т.к. не позволяет обслуживать других клиентов до
завершения текущего обслуживания.
Все
сервера имеют крайне простой код, потому что большая часть <рутины>
снесена в файлы common (h & cpp). Вот код 1-го
используемого нами - последовательного сервера (файл ech0.cpp):
#include "common.h"
int main( int argc, char *argv[] )
{
int ls = getsocket( SINGLE_PORT
), rs;
while( true )
{
if( ( rs =
accept( ls, NULL, NULL ) ) < 0 ) errx( "accept error"
);
retrans( rs
);
close( rs
);
cout <<
"*" << flush;
};
exit( EXIT_SUCCESS
);
};
Вот результаты выполнения клиента с этим сервером (указано
число машинных циклов ожидания, а в скобках - для справки - время в
микросекундах для процессора Celeron
533Mhz):
/root/ForkThread # cli -p9000 -n20
TCP port =
9000, number of echoes =
20
time of reply - Cycles [usec.]
:
868325[1624]
135364[253]
135287[253]
133438[249] 133057[248]
136061[254]
133554[249]
133887[250]
138776[259] 131237[245]
134748[252]
133823[250]
135650[253]
130583[244] 134562[251]
132601[248]
134622[251]
134516[251]
132055[246] 134139[250]
Отчётливо виден (1-й запрос) эффект, который мы отнесли к
эффектам кэширования памяти программ - различие времени выполнения
первого и последующих запросов. В каталоге проекта есть ещё один
(тестовый) вариант последовательного сервера, код которого
выглядит несколько иначе:
#include
<sys/neutrino.h>
#include "common.h"
int main( int argc, char *argv[] )
{
int ls = getsocket( SINGLE_PORT
), rs, i = 0;
while( true )
{
if( ( rs =
accept( ls, NULL, NULL ) ) < 0 ) errx( "accept error"
);
uint64_t
cycle = ClockCycles();
retrans( rs
);
cycle =
ClockCycles() - cycle;
close( rs
);
cout <<
cycle;
if( i++ % 5
== 4 ) cout << endl; else cout << '\t'; cout <<
flush;
};
exit( EXIT_SUCCESS
);
};
Он отличается тем, что <хронометрирует> (для справки)
оценочно число циклов на ретрансляцию (затрачиваемые внутри
сервера). Все типы серверов используют общую процедуру retrans() и
единые затраты <чистого времени>. Приведём для справки эти оценки
(только машинные циклы):
/root/ForkThread # ech0_
757808 60862 60085
60444 60197
61111 60565 60154
59121 59984
Видно, что это время составляет около 50% времени,
наблюдаемого со стороны клиента, которое включает в себя время
реакции на accept() (со стороны сервера),
2-кратные затраты write() + read() (со стороны как клиента, так и сервера),
время передачи буферов по петлевому интерфейсу и т.п.
4. <Классический> параллельный сервер. Ниже приведен
код такого <классического> сервера (в отличающейся части), в котором
обслуживающий процесс порождается fork()
после разблокирования на accept()(файл
ech1.cpp):
#include "common.h"
int main( int argc, char
*argv[] ) {
int ls = getsocket( FORK_PORT
), rs;
while( true )
{
if( ( rs =
accept( ls, NULL, NULL ) ) < 0 ) errx( "accept error"
);
pid_t pid =
fork();
if( pid <
0 ) errx( "fork error" );
if( pid == 0
) {
close( ls
);
retrans( rs
);
close( rs
);
cout
<< "*" <<
flush;
exit( EXIT_SUCCESS
);
}
else close(
rs );
};
exit( EXIT_SUCCESS
);
};
После
выхода из accept() (получение запроса connect() от клиента) - порождается отдельный
обслуживающий процесс, который тут же закрывает свою копию
прослушивающего сокета, производит ретрансляцию через соединённый
сокет, завершает соединение и завершается сам. Родительский же
процесс закрывает свою копию соединённого сокета и продолжает
прослушивание канала. Вот результаты выполнения такого
сервера:
/root/ForkThread # cli -p9001
-n20
TCP port = 9001, number of echoes =
20
time of reply - Cycles [usec.]
:
2219652[4151] 1467470[2744]
1470056[2749] 1466860[2743]
1469294[2748]
1466875[2743] 1467612[2745]
1489083[2785] 1475620[2759]
1665398[3114]
1472091[2753] 1471635[2752]
1481768[2771] 1462214[2734]
1467229[2744]
1468731[2747] 1466483[2742]
1465499[2741] 1461780[2734]
1649821[3085]
Да .
. . время реакции больше чем на порядок превышает простой
последовательный сервер. Видно заметно меньше (относительно)
выраженный эффект кэширования - вновь создаваемое адресное
пространство процесса повторно не используется, однако некоторое
влияние кэширования сказывается (в программе на стороне
клиента?). Добавим в код сервера 1 строчку - перед точкой
main (файл ech10.cpp - и изменён
порт):
static long MEM[ 2500000
];
/root/ForkThread # cli
-p9002
TCP port = 9002,
number of echoes =
10
time of reply - Cycles [usec.]
:
67061908[125432] 64674322[120966]
64126835[119942] 63071907[117969]
64185096[120051]
65478368[122470] 64495464[120632]
64533852[120703] 63831652[119390]
64407915[120468]
Строки вывода перенесены мною, потому, что он уже не
помещаются в формат страницы: время реакции увеличилось почти в 50
раз, превышает время реакции простейшего последовательного сервера
уже почти на 3 порядка (500 раз, или 1000 раз по <чистому> времени
обслуживания), и составляет уже 0.12 секунды на каждый запрос. Что
произошло? При порождении нового процесса по fork() (можно считать, что здесь затраты не столь
большие - из предыдущей таблицы: порядка 1.5 млн. циклов) - OS обязана перекопировать образ задачи (к которой
мы добавили ~20Mb) из адресного пространства
одного процесса, а пространство другого. И не посредством memcpy(), а запросами к ядру системы, потому как
копирование идёт между различными защищёнными образами!
Какие
предварительные итоги можно сделать из рассматриваемых результатов?
Во-первых, то, что OS QNX определённо не
использует технику <copy on write> (COW) для копирования образов порождаемых по fork копий процессов, а, во-вторых, : меняет ли
что-то принципиально применение COW в других
OS, например в Linux? Думаю, что <скорее нет>, т.к. радикальное
снижение начального времени реакции (времени латентности) при
использовании COW оборачивается только
скрытием тех же затрат, но <распределённых> по интервалу
обслуживанию. Т.е., использование COW
эффективно только как <рекламный>, <рыночный> трюк, рассчитанный на
гипнотическое воздействие на конечного потребителя некоторых
<магических> тестовых цифр: и уж категорически неприменимо для realtime OS, поведение которых во времени должно
быть строго детерминировано.
5.
Параллельный сервер с предварительным созданием копий. Так
что же получается: для серверов, работающих на высоко интенсивных
потоках запросов, с традиционным fork-методом всё так плохо? Отнюдь! Нужно только
поменять fork & accept местами - создать заранее некоторый пул
обслуживающих процессов, каждый из которых до прихода клиентского
запроса будет заблокирован на accept (кстати
- accept на одном и том же прослушиваемом
сокете). А после отработки клиентского запроса заблаговременно
создать новый обслуживающий процесс. Эта техника известна как
<предварительный fork> или pre-fork. Меняем текст
сервера (файл ech11.cpp):
#include "common.h"
const int NUMPROC = 3;
int main( int argc, char *argv[] )
{
int ls = getsocket(
PREFORK_PORT ), rs;
for( int i = 0; i < NUMPROC;
i++ ) {
if( fork() ==
0 ) {
int
rs;
while( true
) {
if( ( rs = accept( ls, NULL, NULL ) ) < 0 ) errx( "accept error"
);
retrans( rs );
close( rs );
cout << i << flush;
delay( 250 );
};
};
};
for( int i = 0; i < NUMPROC;
i++ ) waitpid( 0, NULL, 0 );
exit( EXIT_SUCCESS
);
};
При
написании этого текста я несколько <схитрил> и упростил в сравнении
с предложенной абзацем выше моделью. Здесь 3 обслуживающих процесса
сделаны циклическими и не завершаются по окончанию обслуживания, а
снова блокируются на accept, но для
наблюдения эффектов этого вполне достаточно (последняя строка нужна
вообще только для блокировки родительского процесса, и <сохранения>
управляющего терминала - для возможности прекращения всей группы по
^C):
# pidin
...
6901868 1
./ech11
10r REPLY
94228
6901869 1
./ech11
10r REPLY
94228
6901870 1
./ech11
10r REPLY
94228
/root/ForkThread # cli
-p9003
TCP port = 9003, number of echoes =
10
time of reply - Cycles [usec.]
:
854276[1597]
138356[258]
135665[253]
131656[246] 136653[255]
132532[247]
133583[249]
134639[251]
136363[255] 131482[245]
Время
реакции - практически равно последовательному серверу, чего мы и
добивались. В этой программе добавлен вывод идентификатора (i) обрабатывающего процесса (предыдущие сервера
выводили только символ <*> для идентификации факта обработки
запроса). Для этого добавлена и задержка <пере-активизации> процесса
delay(250) - больше 2-х периодов запросов
клиентов, чтоб заставить обрабатывающие процессы чередоваться. Вот
возможный вид протокола сервера:
/root/ForkThread #
2012012012201201201220120120122012012012
Хорошо видно нарушение периодичности последовательности
идентификационных номеров
процессов: после периода простоя всегда обслуживание
осуществляется процессом с индексом 2 (максимальным) - при
множественном блокировании на acept() первым разблокируется процесс,
заблокировавшийся последним (!?).
В
принципе, не так и сложно в такой схеме сделать и динамический пул
процессов, как будет показано ниже для потоков - с той лишь
некоторой сложностью, что здесь каждый процесс выполняется в своём
закрытом адресном пространстве, и для их взаимной синхронизации
придётся использовать что-то из механизмов IPC.
6.
Прежде, чем переходить к потоковым (thread)
реализациям, рассмотрим ещё один fork-вариант: использование суперсервера inetd. При этом весь сервис по запуску
процессов-копий нашего приложения, и перенаправлению его стандартных
потоков ввода-вывода в сокет - возьмёт на себя inetd. Вот полный текст ретранслирующего сервера
для этого случая (файл ech3.cpp):
#include <stdio.h>
#include "common.h"
static char data[ MAXLINE
];
void main( void ) { write( STDOUT_FILENO,
data, read( STDIN_FILENO, data, MAXLINE ) );
};
Просто? Мне кажется, что - очень. Теперь настроим на наше
приложение и запустим inetd
- Дописываем в конфигурационный файл /etc/services строку,
определяющую порт, через который будет вызываться
приложение:
ech3
9004/tcp
- В
конфигурационный файл файл /etc/inetd.conf добавляем
строку, которая определяет режим обслуживания и конкретные параметры
вызываемого приложения:
ech3 stream tcp
nowait root /root/ForkThread/ech3 ech3
-
Запускаем
inetd:
/root/ForkThread # inetd
&
При
заполнении строк концигурационных файлов нужна особая тщательность,
если заголовки сервисов (ech3) в файлах не будут совпадать, то вы
получите просто ошибку связи:
/root/ForkThread # cli
-p9004
TCP port = 9004, number of echoes =
10
time of reply - Cycles [usec.]
:
connect failed: Connection
refused
Проверить, что inetd настроен на прослушивание нашего порта
можно так:
/etc # netstat -a
Active Internet connections (including
servers)
Proto Recv-Q Send-Q Local
Address
Foreign Address
State
...
tcp
0 0
*.ech3
*.*
LISTEN
Заставить inetd перечитать свои конфигурационные файлы после
каждой правки /etc/services или /etc/inetd.conf вы можете,
послав ему сигнал SGHUP, например:
/etc # pidin
...
10231922 1
usr/sbin/inetd 10r
SIGWAITINFO
/etc # kill -SIGHUP
10231922
Если
ошибка допущена в полном имени программы сервера (поля 6-7 строки
inetd.conf), то мы тоже получим не сразу объяснимый
результат:
/root/ForkThread # cli
-p9004
TCP port = 9004, number of echoes =
10
time of reply - Cycles [usec.]
:
server closed connection
...
и, наконец, если всё в настройке inetd правильно, то получим нечто
похожее:
/root/ForkThread # cli
-p9004
TCP port = 9004, number of echoes =
10
time of reply - Cycles [usec.]
:
16442468[30753] 14169659[26502]
14354292[26848] 14160723[26486] 14187182[26535]
14145131[26457] 14411884[26955]
14761467[27609] 14207573[26573] 14491483[27104]
Отметим, что время реакции в несколько раз (до 10-ти) выше
прямой реализации с fork (inetd ведь также <скрыто> делает fork), но зато какая простота и трудоёмкость!
Характерно почти полное отсутствие эффектов кэширования. Для
серверов, обслуживающих <неплотный> поток запросов - это, пожалуй,
оптимальное решение (кстати, большинство <штатных> сетевых сервисов
UNIX выполняется именно по такой
схеме).
7.
Сервер, использующий pthread_create по запросу обслуживания клиента (файл
ech2.cpp):
#include <pthread.h>
#include "common.h"
void* echo( void* ps ) {
int sc =
*(int*)ps;
sched_yield();
retrans( sc
);
close( sc
);
cout << "*"
<<
flush;
return
NULL;
}
int main( int argc, char *argv[] )
{
int ls = getsocket( THREAD_PORT
), rs;
while( true )
{
if( ( rs =
accept( ls, NULL, NULL ) ) < 0 ) errx( "accept error"
);
if(
pthread_create( NULL, NULL, &echo, &rs ) != EOK ) errx(
"thread create error" );
sched_yield();
};
exit( EXIT_SUCCESS
);
};
Минимальные комментарии: 2 вызова sched_yield() (в
вызывающем потоке, и, позже, в функции обслуживания созданного
потока) - предназначены для гарантии копирования созданным потоком
переданного дескриптора сокета до его повторного переопределения в
цикле вызывающего потока. Результаты выполнения
программы:
/root/ForkThread # cli
-p9005
TCP port = 9005, number of echoes =
10
time of reply - Cycles [usec.]
:
2493948[4664] 266123[497]
269490[504]
279049[521] 267775[500]
266880[499]
288175[539]
268589[502]
267990[501] 267003[499]
Это
только в 2 раза (в 3, если оценивать по <чистому> времени) хуже
простого последовательного сервера. Чрезвычайно сильно выражен
эффект кэширования - вся обработка последовательности запросов
производится на едином (многократно используемом) пространстве
адресов.
8.
Сервер с предварительным созданием потоков. Поступим по
аналогии с pre-fork, и создадим фиксированный пул потоков
предварительно (pre-thread, файл ech22.cpp):
#include <pthread.h>
#include "common.h"
static int ntr = 3;
/*число thread в пуле*/
static pthread_mutex_t mutex =
PTHREAD_MUTEX_INITIALIZER;
static pthread_cond_t condvar =
PTHREAD_COND_INITIALIZER;
void* echo( void* ps ) {
int sc = *(int*)ps,
rs;
sched_yield();
if( ( rs = accept( sc,
NULL, NULL ) ) < 0 ) errx( "accept error" );
retrans( rs
);
close( rs
);
pthread_mutex_lock(
&mutex );
ntr++;
pthread_cond_signal(
&condvar );
pthread_mutex_unlock(
&mutex );
cout <<
pthread_self() << flush;
delay( 250
);
return
NULL;
}
int main( int argc, char *argv[] )
{
int ls = getsocket(
PRETHREAD_PORT ),
rs;
while( true )
{
if(
pthread_create( NULL, NULL, &echo, &ls ) != EOK ) errx(
"thread create error" );
sched_yield();
pthread_mutex_lock( &mutex );
ntr--;
while( ntr
<= 0 ) pthread_cond_wait( &condvar, &mutex
);
pthread_mutex_unlock( &mutex );
};
exit( EXIT_SUCCESS
);
};
Здесь
accept (как и раньше в случае prefork) перенесен в обрабатывающий поток (все
thread блокированы в accept на единственном прослушивающем сокете). Для
синхронизации я использую условную переменную, но могут применятся
любые из синхронизирующих примитивов. Испытываем полученную
программу:
/root/ForkThread # cli
-p9007
TCP port = 9007, number of echoes =
10
time of reply - Cycles [usec.]
:
879988[1645]
134687[251]
137152[256]
136303[254] 693676[1297]
138605[259]
140320[262]
138937[259]
136886[256] 342027[639]
Время
реакции очень близко к последовательному серверу (к минимально
достижимому потенциально!). Потоки обработчики на сервере
идентифицируют себя своим tid:
/root/ForkThread # ech22
4567891011121314151617181920212223
Хорошо видно последовательное порождение нового потока для
обработки каждого запроса клиента. Так же сильно, как и в предыдущем
случае, выражены эффекты кэширования.
9.
Можно создать сколь угодно сложный диспетчер, поддерживающий
оптимальное число потоков (или процессов) в сервере, но в OS QNX от уже предоставлен как стандартное
средство системы: потоковый пул (thread_pool_*). Сервер с
использованием динамического пула потоков (файл ech21.cpp):
#include <pthread.h>
#include
<sys/dispatch.h>
#include "common.h"
static int ls;
THREAD_POOL_PARAM_T *alloc(
THREAD_POOL_HANDLE_T *h ) { return (THREAD_POOL_PARAM_T*)h;
};
THREAD_POOL_PARAM_T *block(
THREAD_POOL_PARAM_T *p ) {
int rs = accept( ls,
NULL, NULL );
if( rs < 0 ) errx(
"accept error" );
return
(THREAD_POOL_PARAM_T*)rs;
};
int handler( THREAD_POOL_PARAM_T *p )
{
retrans( (int)p
);
close( (int)p
);
delay( 250
);
cout <<
pthread_self() <<
flush;
return
0;
};
int main( int argc, char *argv[] )
{
ls = getsocket(
THREAD_POOL_PORT
);
thread_pool_attr_t
attr;
memset( &attr, 0, sizeof(
thread_pool_attr_t ) );
attr.lo_water = 3;
/* заполнение блока атрибутов пула */
attr.hi_water = 7;
attr.increment =
2;
attr.maximum =
9;
attr.handle =
dispatch_create();
attr.context_alloc =
alloc;
attr.block_func =
block;
attr.handler_func =
handler;
void *tpp = thread_pool_create(
&attr, POOL_FLAG_USE_SELF ) ;
if( tpp == NULL ) errx(
"create pool" );
thread_pool_start( tpp
);
exit( EXIT_SUCCESS
);
};
Всё,
сервер готов - почти всё необходимое за нас сделала библиотека
OS.
Грубо, логика работы пула потоков QNX следующая:
-
начально создаётся attr.lo_water (<нижняя
ватерлиния>) потоков;
-
для каждого потока при создании вызывается функция *attr.context_alloc;
- эта
функция по завершению вызовет (сама) блокирующую функцию потока
*attr.block_func;
- эта
функция, после разблокирования (accept)
вызовет функцию обработчика *attr.handler_func, которой в
качестве параметра (в нашем тексте) передаст дескриптор
присоединённого сокета;
- как
только число заблокированных потоков станет ниже attr.lo_water - механизм пула создаст дополнительно attr.increment
потоков;
-
если число блокированных потоков в какой-то момент превысит attr.hi_water (<верхняя ватерлиния>) - <лишние> потоки
будут уничтожены;
- .
. . и всё это так, чтобы общее число потоков (выполняющиеся +
блокированные) не превышало attr.maximum.
Это -
уникально мощный механизм, с очень широкой функциональностью, но за
более детальной информацией я отсылаю всех заинтересованных к
технической документации OS QNX. Смотрим это
в действии:
/root/ForkThread # cli -p9006
-n20
TCP port = 9006, number of echoes =
20
time of reply - Cycles [usec.]
:
828384[1549]
139615[261]
142050[265]
144799[270] 143895[269]
146760[274]
142760[267]
145951[272]
142816[267] 144384[270]
144657[270]
159474[298]
147504[275]
147113[275] 145257[271]
146866[274]
153215[286]
145461[272]
145013[271] 145311[271]
Результаты очень близкие к максимально возможным! Так же, как
и в предыдущих случаях - очень ярко выражен эффект кэширования: вся
обработка ведётся на одном и том же, многократно используемом,
адресном пространстве.
Посмотрим <чередование> tid
обрабатывающих потоков:
/root/ForkThread # ech21
15315315311731731731731731731776176176176176176176
Хорошо видно, что через некоторое время работы число потоков
в пуле стабилизируется на уровне 7-ми (<верхней ватерлинии>). Через
какое-то время выполнения состояние пула будет примерно
таким:
/ # pidin
...
10059820 1
./ech21
10r REPLY
94228
10059820 2
./ech21
10r REPLY
94228
10059820 3
./ech21
10r REPLY
94228
10059820 4
./ech21
10r REPLY
94228
10059820 5
./ech21
10r REPLY
94228
10059820 6
./ech21
10r REPLY
94228
10059820 7
./ech21
10r REPLY
94228
...
Как
и предсказывает документация - мы имеем 7 блокированных на accept потоков - по <верхнюю
ватерлинию>.
Достаточно интересно посмотреть состояния ожидающих сокетов
при запущенных всех (или почти всех) видах описанных выше серверов -
вот возможное начало таблицы netstat:
/root/ForkThread # netstat
-a
Active Internet connections (including
servers)
Proto Recv-Q Send-Q Local
Address
Foreign Address
State
tcp
0 0
*.9005
*.*
LISTEN
tcp
0 0
*.9003
*.*
LISTEN
tcp
0 0
*.9006
*.*
LISTEN
tcp
0 0
*.9002
*.*
LISTEN
tcp
0 0
*.9001
*.*
LISTEN
tcp
0 0
*.9000
*.*
LISTEN
tcp
0 0
*.ech3
*.*
LISTEN
10.
Итоги. Выше рассмотрено 7 различных альтернативных технологий
построения сервера TCP/IP. Сравним средние характеристики вариантов по
критерию <время задержки реакции> (представляют интерес только
порядки величин, сами значения могут радикально <гулять> в
зависимости от конкретного вида серверной функции):
Тип сервера |
Среднее время
обслуживания |
Время латентности |
Последовательный - п.3 |
135 000 |
0 |
fork - п.4 |
>>1 470 000 |
>>1 000 000 |
pre-fork - п.5 |
133 000 |
0 |
inetd - п.6 |
14 100 000 |
14 000 000 |
thread - п.7 |
267 000 |
130 000 |
pre-thread - п.8 |
140 000 |
5 000
(~0) |
thread pool - п.9 |
144 000 |
9 000
(~0) |
Тем
не менее, не следует категорически руководствоваться выбором той или
иной технологии построения сервера только исходя из содержимого
показанной выше таблицы. В каждом конкретном случае при выборе
решения должно учитываться существенно больше факторов: трудоёмкость
реализации, потребление ресурсов, в частности RAM (которое мы никак не затрагиваем в нашем
рассмотрении), простота отладки и сопровождения etc.
P.S. 1. Все упоминаемые в тексте элементы
программного кода, или необходимые для их сборки элементы (Makefile) содержаться в составе прилагаемого
проекта echsrv.tgz.
2.
Материал данного рассмотрения непосредственно произошёл от
обсуждения подобных вопросов на форуме http://qnx.org.ru/forum тема
<fork или thread> :
т.е., в первую очередь, автор приносит свои благодарности всем,
принявшим участие в обсуждении. Во-вторых - все обсуждавшие данную
тему в форуме, являются соавторами предлагаемого материала в той же
мере, как и автор, указанный в титуле статьи, а их полное поимённое
перечисление : я опускаю только в силу их многочисленности.
Источник: http://codenet.ru |