30 #include <netinet/in.h>
40 #include <sys/select.h>
41 #include <sys/socket.h>
58 long swaplw(
int*,
int,
int);
75 MRevBuffer::MRevBuffer(Int_t iMode)
84 , iBufSizeAlloc(512000)
97 , piBuf(new int[iBufSizeAlloc / sizeof(int) + 1])
115 LOG(debug) <<
"-I- client runs in debug mode (1)";
116 }
else if (iDebug == 2)
117 LOG(debug) <<
"-I- client shows buffer numbers and select/receive (mode 2)";
118 else if (iDebug == 3) {
119 LOG(debug) <<
"-I- client shows buffer numbers (mode 3)";
120 }
else if (iDebug == 5) {
121 LOG(debug) <<
"-I- client shows event parameters (mode 5)";
125 LOG(debug) <<
" check ENDIAN, ";
127 LOG(debug) <<
" should be BIG_ENDIAN: ";
130 LOG(debug) <<
" should be LITTLE_ENDIAN: ";
134 LOG(debug) <<
" BIG_ENDIAN";
136 LOG(debug) <<
" LITTLE_ENDIAN";
143 LOG(debug) <<
"-D- buffer allocated (" << iBufSizeAlloc +
sizeof(int) <<
" byte)";
165 LOG(info) <<
"-E- number of requested events (" << iEvent <<
") invalid";
170 LOG(info) <<
"-I- unlimited no. of MBS events requested - break with 'CTL C'";
174 LOG(info) <<
"-E- old event server (port no. 6001) running on DAQ frontend not yet supported";
175 LOG(info) <<
" use stream server (port no. 6002) and remote event server (port no. 6003)";
194 LOG(info) <<
"-I- open connection to server " << pNode <<
":" << iPort;
196 pTSocket =
new TSocket(pNode, iPort);
197 if (!pTSocket->IsValid()) {
198 LOG(info) <<
"-E- open connection to server " << pNode <<
" failed";
201 LOG(info) <<
" connection to server " << pNode <<
":" << iPort <<
" okay";
203 iSocket = pTSocket->GetDescriptor();
206 LOG(debug) <<
" socket " << iSocket;
209 }
else if (iDebug == 1) {
210 LOG(debug) <<
"-D- socket " << iSocket;
223 pEvent =
RevGet(pSocket, iFlush, 0);
235 Int_t iint =
sizeof(int);
237 Int_t ilen, iselen, iselen1, ielen, inew = 0;
241 Char_t cMsg[128] =
"";
247 Int_t iRetryMax = 1000;
249 Int_t iRetryRecv = 0;
250 Int_t iRetryRecvLim = 1;
254 Int_t iCommSize =
sizeof(sComm);
260 Int_t iInfoSize =
sizeof(sInfo);
269 pEvtHead = &sEvtHead;
271 pSEvtHead = &sSEvtHead;
275 if (iEvtNo >= iEvtMax) {
279 piComm = &(sComm.
iSize);
280 sComm.
iSize = htonl(iCommSize - iint);
281 sComm.
iMode = htonl(1);
288 LOG(debug) <<
"-D- commbuf (data size " << ntohl(sComm.
iSize) <<
" byte): mode(1) " << ntohl(sComm.
iMode)
289 <<
", request " << ntohl(sComm.
iBufRequ) <<
" event buffer(s)";
292 ilen = pSocket->SendRaw(piComm, iCommSize, kDefault);
294 LOG(info) <<
"-E- sending request for events to server, rc = " << ilen;
300 LOG(debug) <<
" communication buffer sent (request info buffer) ";
303 piInfo = &(sInfo.
iSize);
304 ilen = pSocket->RecvRaw(piInfo, iInfoSize, kDefault);
306 LOG(info) <<
"-E- receiving info buffer from server, rc = " << ilen;
314 LOG(debug) <<
"-D- info buffer received:";
315 LOG(debug) <<
" size data " << ntohl(sInfo.
iSize) <<
", mode (1) " << ntohl(sInfo.
iMode)
316 <<
", header parms " << iHeadPar <<
", timeout " <<
iTimeOut;
319 if ((ntohl(sInfo.
iMode) != 1) || (static_cast<int>(ntohl(sInfo.
iSize)) != iInfoSize - iint)) {
320 LOG(info) <<
"-E- invalid info buffer received: ";
321 LOG(info) <<
" size data ( " << iInfoSize - iint <<
") " << ntohl(sInfo.
iSize) <<
", mode (1) "
322 << ntohl(sInfo.
iMode) <<
", header parms " << iHeadPar <<
", timeout " <<
iTimeOut;
335 LOG(debug) <<
"-D- skip current buffer";
351 if (iEvtRel + ii >= iEvtBuf) {
353 LOG(debug) <<
"-D- request new buffer";
357 piNextEvt += iEvtPar;
358 LOG(debug) <<
"-D- next 40 2byte-words of buffer:";
359 psNextEvt =
reinterpret_cast<short*
>(piNextEvt);
360 for (Int_t iii = 0; iii < 40; iii++) {
361 LOG(debug) <<
" " << iii + 1 <<
": " << psNextEvt[iii];
382 ilen = pSocket->SendRaw(piComm, iCommSize, kDefault);
384 LOG(info) <<
"-E- sending request for buffer " << iBufNo + 1 <<
" to server, rc = " << ilen;
394 LOG(debug) <<
"-D- communication buffer sent (request next buffer) ";
400 pcBuf =
reinterpret_cast<char*
>(piBuf);
402 if ((
imySig == -1) && (iDebug)) {
403 LOG(debug) <<
" CTL C detected (before recv len)";
406 iRC = recv(iSocket, pcBuf, iSize, 0);
410 sprintf(cMsg,
"\n-E- receiving data length from server");
412 LOG(debug) <<
" CTL C detected (during recv len)";
416 sprintf(cMsg,
"\n-E- receiving data length from server");
419 LOG(debug) <<
" retry";
423 if (iRetryRecv > iRetryRecvLim) {
432 if ((iDebug == 2) || (iDebug == 3)) {
435 LOG(info) <<
"-E- receiving data length: connection closed by server";
451 if ((
imySig == -1) && (iDebug)) {
452 LOG(debug) <<
" CTL C detected (after recv len)";
455 iBufSize = ntohl(piBuf[0]);
457 LOG(debug) <<
" data size received: " << iBufSize;
465 LOG(info) <<
"-W- server closed connection";
466 LOG(info) <<
" " << iEvtNo <<
" of " << iEvtMax <<
" events received";
471 if (iBufSize == -1) {
474 <<
"-E- no data length received: ";
478 if (iRetry > iRetryMax) {
479 LOG(info) << iRetryMax <<
"times";
486 <<
"-E- invalid data length received: " << iBufSize;
493 LOG(info) << iRetry <<
"times";
497 if (iBufSize + iint > iBufSizeAlloc) {
499 iBufSizeAlloc = iBufSize + iint;
501 piBuf =
new int[iBufSizeAlloc / iint];
505 LOG(debug) <<
"-I- total buffer increased to " << iBufSizeAlloc <<
" byte";
511 pcBuf =
reinterpret_cast<char*
>(&(piBuf[1]));
513 if ((
imySig == -1) && (iDebug)) {
514 LOG(debug) <<
" CTL C detected (before recv data)";
517 iRC = recv(iSocket, pcBuf, iSize, 0);
521 sprintf(cMsg,
"\n-E- receiving data from server");
523 LOG(debug) <<
" CTL C detected (during recv data)";
527 sprintf(cMsg,
"\n-E- receiving data from server");
531 if (iRetryRecv > iRetryRecvLim) {
540 if ((iDebug == 2) || (iDebug == 3)) {
543 LOG(info) <<
"-E- receiving data: connection closed by server";
561 LOG(debug) <<
" CTL C detected (after recv data)";
567 if (iBufSize == (
sizeof(
sptrevDummy) -
sizeof(
int))) {
568 iBufNoServ = ntohl(piBuf[1]);
569 iEvtBuf = ntohl(piBuf[2]);
572 printf(
" dummy buffer no. %d, %d events\n", iBufNoServ, iEvtBuf);
576 LOG(debug) <<
"*** connection to remote event server okay, but currently no DAQ events (" << iBufNoServ
581 LOG(info) <<
"-E- invalid event number in dummy buffer no. " << iBufNoServ <<
": " << iEvtBuf
599 swaplw(&piBuf[1], iBufSize / iint, 0);
600 if ((iBufNo == 0) && (iDebug)) {
601 LOG(debug) <<
" Event data swapped";
606 iBufNoServ = piBuf[4];
611 iBufNo1 = iBufNoServ;
617 iBufNo2 = iBufNoServ;
620 printf(
"%d:", iBufNoServ);
627 <<
"buffer " << iBufNo <<
" (" << iBufNoServ <<
"): "
628 <<
" size " << iBufSize <<
" byte";
630 LOG(debug) <<
" last event fragment";
633 LOG(debug) <<
" first event fragment";
635 LOG(debug) <<
" buffer contains " << iEvtBuf <<
" elements";
640 if ((iEvtNo > 0) && (iFragBegin)) {
651 LOG(debug) <<
"-D- first 50 2byte-words of buffer:";
652 psNextEvt =
reinterpret_cast<short*
>(&piBuf[1]);
653 for (Int_t iii = 0; iii < 50; iii++) {
654 LOG(debug) <<
" " << iii + 1 <<
": " << psNextEvt[iii];
659 piNextEvt = piBuf + iHeadPar + 1;
660 iEvtPar = piNextEvt[0] / 2 + 2;
663 piNextEvt += iEvtPar;
664 iEvtPar = piNextEvt[0] / 2 + 2;
670 piNextEvt += iEvtPar;
671 iEvtPar = piNextEvt[0] / 2 + 2;
676 iBufNoServ = piBuf[4];
677 iBufNo1 = iBufNoServ;
696 psNextEvt =
reinterpret_cast<short*
>(piNextEvt);
697 pEvtHead =
reinterpret_cast<sMbsEv101*
>(piNextEvt);
699 pSEvtHead =
reinterpret_cast<sMbsSev101*
>(&piNextEvt[4]);
700 pshort =
reinterpret_cast<short*
>(pSEvtHead);
704 if ((iDebug == 1) || (iDebug == 5)) {
714 pEvt->
pSubEvt[0] =
reinterpret_cast<Int_t*
>(&pshort[6]);
726 ielen -= (iselen1 + 8);
733 pshort += iselen + 4;
734 pSEvtHead =
reinterpret_cast<sMbsSev101*
>(pshort);
738 ielen -= (iselen + 4);
747 pEvt->
pSubEvt[ii - 1] =
reinterpret_cast<Int_t*
>(&pshort[6]);
753 iHead[0] = piNextEvt[0];
754 iHead[1] = piNextEvt[3];
762 <<
"-D- CTL C specified";
764 LOG(debug) <<
" (at end RevGet)";
771 if (iEvtNo == iEvtMax) {
773 <<
"-I- all required events (" << iEvtMax <<
") received: " << iBufNo <<
" buffers (" << iBufNo1
774 <<
" - " << iBufNo2 <<
")";
775 LOG(info) <<
" fragments found: " << iFragBeginIgn <<
" begin, " << iFragEndIgn <<
" end";
777 LOG(info) <<
" " << iFragConc <<
" events not concatenated from fragments";
784 if ((iError) || (
imySig == -1)) {
786 LOG(debug) <<
" RevGet: closing connection to server";
788 iRC =
rclose(&iSocket, 2);
789 if ((iDebug) && (iRC == 0)) {
790 LOG(debug) <<
" - done";
800 }
else if (iDebug == 1) {
801 LOG(debug) <<
" RevGet: keeping connection to server";
822 LOG(info) <<
"-I- *** Last request for events was successfull";
825 LOG(info) <<
"-I- *** Remote event server not yet connected";
828 LOG(info) <<
"-I- *** Remote event server connected, but still no request for events";
831 LOG(info) <<
"-I- *** Connection to remote event server okay, but currently no DAQ events";
834 LOG(info) <<
"-I- *** Connection to remote event server closed";
837 LOG(info) <<
"-I- *** Connection to remote event server closed after user break (CTL C)";
840 LOG(info) <<
"-I- *** Connection to remote event server closed after failure";
843 LOG(info) <<
"-E- Invalid status remote event server found: " << iStatus;
854 Int_t iCommSize =
sizeof(sComm);
864 piComm = &(sComm.
iSize);
865 sComm.
iSize = htonl(iCommSize -
sizeof(
int));
866 sComm.
iMode = htonl(1);
871 LOG(debug) <<
"-D- send close request (data size " << ntohl(sComm.
iSize) <<
" byte): " << ntohl(sComm.
iMode)
874 iRC = pSocket->SendRaw(piComm, iCommSize, kDefault);
876 LOG(info) <<
"-E- sending close request to server, rc = " << iRC;
877 else if (iDebug == 1) {
878 LOG(debug) <<
" close request sent";
882 LOG(debug) <<
" RevClose: closing connection to server";
884 iRC =
rclose(&iSocket, 2);
885 if ((iDebug) && (iRC == 0)) {
886 LOG(debug) <<
" - done";
891 LOG(info) <<
"-I- connection to server closed";
936 if ((iChan < 1) || (iChan > (iSize + 4) / (static_cast<signed>(
sizeof(
short))))) {
937 LOG(info) <<
"-E- event parameter number " << iChan <<
" out of range ("
938 << (iSize + 4) / (static_cast<signed>(
sizeof(
short))) <<
" long words)";
942 iValue = pint[iChan - 1];
Short_t subEvtSubType[100]
Int_t RevStatus(Int_t iOut)
void ReFillData(Int_t *pData)
Int_t * RevGetI(TSocket *pSocket, Int_t iFlush)
Short_t subEvtProcId[100]
void RevClose(TSocket *pSocket)
unsigned char cMbsSev101_subcrate
unsigned char cBuf_fragBegin
Short_t subEvtSubCrate[100]
ClassImp(FairEventBuilder)
Short_t subEvtControl[100]
REvent * RevGet(TSocket *pSocket, Int_t iFlush, Int_t iSkip)
TSocket * RevOpen(char *pNode, Int_t iPort, Int_t iEvent)
Int_t ReGetData(Int_t ichan)
long swaplw(int *, int, int)
void RevBufWait(Int_t iWait)
unsigned char cMbsSev101_control
void ReFillHead(Int_t *pHead)
unsigned char cBuf_fragEnd