FairRoot
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
MRevBuffer.cxx
Go to the documentation of this file.
1 /********************************************************************************
2  * Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
3  * *
4  * This software is distributed under the terms of the *
5  * GNU Lesser General Public Licence (LGPL) version 3, *
6  * copied verbatim in the file "LICENSE" *
7  ********************************************************************************/
9 // Copyright:
10 // GSI, Gesellschaft fuer Schwerionenforschung mbH
11 // Planckstr. 1
12 // D-64291 Darmstadt
13 // Germany
14 // created 16. 2.1999 by Horst Goeringer
16 // MRevBuffer.cc
17 // ROOT client package for multithreaded remote event server (MBS)
19 // 15. 3.1999, H.G.: RecvRaw replaced by original recv calls
20 // 26. 7.1999, H.G.: accept dummy buffers (no events from DAQ)
21 // 20. 8.1999, H.G.: MRevBuffer::RevGetI added
22 // 3. 3.2000, H.G.: change default port no.: 6013 -> 6003
23 // 6. 3.2000, H.G.: new member function RevStatus
24 // 19.11.2001, H.G.: increase data buffer dynamically
26 
27 #include "MRevBuffer.h" // class definition
28 
29 #include <TSocket.h> // for TSocket, etc
30 #include <netinet/in.h> // IWYU pragma: keep
31 #include <signal.h> // IWYU pragma: keep
32 #include <time.h> // IWYU pragma: keep
33 #include <unistd.h> // IWYU pragma: keep
34 
35 #ifdef Linux
36 #include <select.h> // IWYU pragma: keep
37 #include <socket.h> // IWYU pragma: keep
38 #else // AIX
39 #include <strings.h> // IWYU pragma: keep
40 #include <sys/select.h> // IWYU pragma: keep
41 #include <sys/socket.h> // IWYU pragma: keep
42 #endif
43 
44 #include "ptrevcomm.h" // communication structure
45 #include "ptrevmbsdef.h" // MBS data definitions
46 
47 // IWYU pragma: no_include <stdio.h>
48 // IWYU pragma: no_include <sys/_endian.h>
49 // IWYU pragma: no_include <sys/signal.h>
50 
51 #include "FairLogger.h"
52 
55 
56 extern "C"
57 {
58  long swaplw(int*, int, int); // swap data
59  void exitCli(int); // handle CTL C
60  int rclose(int*, int); // close connection to server
61 }
62 
63 #ifdef Linux
64 struct timeval
65 {
66  long tv_sec;
67  long tv_usec;
68 };
69 #endif
70 
71 int iTimeOut; // needed in exitCli to handle CTL C
72 int imySig = 0; // needed in exitCli to handle CTL C
73 int iOutMode = 0; // needed in exitCli to handle CTL C
74 
75 MRevBuffer::MRevBuffer(Int_t iMode)
76  : TObject()
77  , pTSocket(nullptr)
78  , iSocket(0)
79  , iBufNo1(0)
80  , iBufNo2(0)
81  , iDebug(iMode)
82  , iSwap(0)
83  , iStatus(1)
84  , iBufSizeAlloc(512000)
85  , iBufSize(0)
86  , iBufNo(0)
87  , iFragBegin(0)
88  , iFragConc(0)
89  , iFragBeginIgn(0)
90  , iFragEndIgn(0)
91  , iHeadPar(0)
92  , iEvtMax(0)
93  , iEvtNo(0)
94  , iEvtRel(0)
95  , iEvtBuf(0)
96  , iEvtPar(0)
97  , piBuf(new int[iBufSizeAlloc / sizeof(int) + 1])
98  , piNextEvt(nullptr)
99  , pEvt(new REvent())
100 {
101  // iStatus = 1; // server not yet connected
102  /*
103  iSwap = 0;
104  iSocket = 0;
105  iEvtRel = 0;
106  iEvtPar = 0;
107  iBufSize = 0;
108  */
109 
110  signal(SIGINT, exitCli); // from now catch CTL C
111 
112  // iDebug = iMode;
113  iOutMode = iMode;
114  if (iDebug == 1) {
115  LOG(debug) << "-I- client runs in debug mode (1)";
116  } else if (iDebug == 2)
117  LOG(debug) << "-I- client shows buffer numbers and select/receive (mode 2)";
118  else if (iDebug == 3) {
119  LOG(debug) << "-I- client shows buffer numbers (mode 3)";
120  } else if (iDebug == 5) {
121  LOG(debug) << "-I- client shows event parameters (mode 5)";
122  }
123 
124  if (iDebug == 1) {
125  LOG(debug) << " check ENDIAN, ";
126 #ifdef _AIX
127  LOG(debug) << " should be BIG_ENDIAN: ";
128 #endif
129 #ifdef Linux
130  LOG(debug) << " should be LITTLE_ENDIAN: ";
131 #endif
132 
133 #ifdef BIG__ENDIAN
134  LOG(debug) << " BIG_ENDIAN";
135 #else
136  LOG(debug) << " LITTLE_ENDIAN";
137 #endif
138  }
139 
140  // iBufSizeAlloc = 16384;
141  // piBuf = new int [iBufSizeAlloc/sizeof(int)+1]; // 16k buffer + len
142  if (iDebug == 1)
143  LOG(debug) << "-D- buffer allocated (" << iBufSizeAlloc + sizeof(int) << " byte)";
144 
145  // REvent* pev = new REvent(); // create event (once)
146  // pEvt = pev; // keep pointer in class MRevBuffer
147 
148  // LOG(info) << " MRevBuffer() executed";
149 
150 } // constructor
151 
153 {
154  // LOG(info) << " ~MRevBuffer() ...";
155  delete[] piBuf;
156  piNextEvt = 0;
157  pEvt->~REvent();
158  // LOG(info) << " ~MRevBuffer() executed";
159 
160 } // destructor
161 
162 TSocket* MRevBuffer::RevOpen(char* pNode, Int_t iPort, Int_t iEvent)
163 {
164  if (iEvent < 0) {
165  LOG(info) << "-E- number of requested events (" << iEvent << ") invalid";
166  return (0);
167  }
168  if (iEvent == 0) {
169  iEvent = 2000000000; // nearly unlimited
170  LOG(info) << "-I- unlimited no. of MBS events requested - break with 'CTL C'";
171  }
172 
173  if (iPort == 6001) {
174  LOG(info) << "-E- old event server (port no. 6001) running on DAQ frontend not yet supported";
175  LOG(info) << " use stream server (port no. 6002) and remote event server (port no. 6003)";
176  return (0);
177  }
178  if (iPort == 0) {
179  iPort = 6003;
180  } // default MBS remote event server
181 
182  iEvtMax = iEvent;
183  iEvtNo = 0;
184  iBufNo = 0;
185  iBufNo1 = 0;
186  iBufNo2 = 0;
187  iFragConc = 0;
188  iFragBegin = 0;
189  iFragBeginIgn = 0;
190  iFragEndIgn = 0;
191 
192  if (!iSocket) {
193  iEvtNo = -1; // initialization (info buffer) required
194  LOG(info) << "-I- open connection to server " << pNode << ":" << iPort;
195 
196  pTSocket = new TSocket(pNode, iPort);
197  if (!pTSocket->IsValid()) {
198  LOG(info) << "-E- open connection to server " << pNode << " failed";
199  return (0);
200  }
201  LOG(info) << " connection to server " << pNode << ":" << iPort << " okay";
202 
203  iSocket = pTSocket->GetDescriptor();
204  imySig = iSocket;
205  if (iDebug == 1) {
206  LOG(debug) << " socket " << iSocket;
207  }
208 
209  } else if (iDebug == 1) {
210  LOG(debug) << "-D- socket " << iSocket;
211  }
212 
213  iStatus = 2; // server connected
214  return (pTSocket);
215 
216 } // RevOpen
217 
218 Int_t* MRevBuffer::RevGetI(TSocket* pSocket, Int_t iFlush)
219 {
220  Int_t* piEvent;
221  REvent* pEvent;
222 
223  pEvent = RevGet(pSocket, iFlush, 0);
224  if (pEvent) {
225  piEvent = piNextEvt;
226  } else {
227  piEvent = 0;
228  }
229  return piEvent;
230 
231 } // RevGetI
232 
233 REvent* MRevBuffer::RevGet(TSocket* pSocket, Int_t iFlush, Int_t)
234 {
235  Int_t iint = sizeof(int);
236  Int_t ii;
237  Int_t ilen, iselen, iselen1, ielen, inew = 0;
238 
239  Short_t* pshort;
240 
241  Char_t cMsg[128] = "";
242  Char_t* pcBuf;
243  Int_t iSize, iRC;
244 
245  Int_t iError = 0;
246  Int_t iRetry;
247  Int_t iRetryMax = 1000;
248  Int_t iRetryFirst;
249  Int_t iRetryRecv = 0; // count retries of recv call
250  Int_t iRetryRecvLim = 1; // max. no. of succeeding retries
251 
252  Int_t* piComm;
253  srevComm sComm;
254  Int_t iCommSize = sizeof(sComm); // size comm. buffer (byte)
255 
256  Int_t* piInfo;
257  // srevInfo sInfo = {.iSize=0, .iMode=1, .iHeadPar=0 , .iTimeOut=0 };
259  srevInfo sInfo = {0, 1, 0, 0};
260  Int_t iInfoSize = sizeof(sInfo); // size info buffer (byte)
261 
262  // Int_t iHeadPar = 12; // no. of params in MBS buffer header
263  iHeadPar = 12; // no. of params in MBS buffer header
264  Int_t iBufNoServ; // buffer no. sent from server
265  short* psNextEvt;
266  Int_t iHead[2];
267 
268  sMbsEv101 sEvtHead, *pEvtHead; // header event 10.1
269  pEvtHead = &sEvtHead;
270  sMbsSev101 sSEvtHead, *pSEvtHead; // header event 10.1
271  pSEvtHead = &sSEvtHead;
272  sMbsBufFrag sFrag, *pFrag; // fragmented event flags
273  pFrag = &sFrag;
274 
275  if (iEvtNo >= iEvtMax) {
276  goto gEndGet;
277  }
278 
279  piComm = &(sComm.iSize); // communication buffer
280  sComm.iSize = htonl(iCommSize - iint); // size of data following
281  sComm.iMode = htonl(1); // required: get events
282  sComm.iIdent = 1; // required: tell endian type
283  sComm.iBufRequ = htonl(1); // send one buffer
284 
285  // initialize communication with server
286  if (iEvtNo == -1) { // initialize communication with server
287  if (iDebug == 1)
288  LOG(debug) << "-D- commbuf (data size " << ntohl(sComm.iSize) << " byte): mode(1) " << ntohl(sComm.iMode)
289  << ", request " << ntohl(sComm.iBufRequ) << " event buffer(s)";
290 
291  // request event buffer from server
292  ilen = pSocket->SendRaw(piComm, iCommSize, kDefault);
293  if (ilen < 0) {
294  LOG(info) << "-E- sending request for events to server, rc = " << ilen;
295  iError = 1;
296  goto gEndGet;
297  }
298 
299  if (iDebug == 1)
300  LOG(debug) << " communication buffer sent (request info buffer) ";
301 
302  // receive info buffer from server
303  piInfo = &(sInfo.iSize);
304  ilen = pSocket->RecvRaw(piInfo, iInfoSize, kDefault);
305  if (ilen < 0) {
306  LOG(info) << "-E- receiving info buffer from server, rc = " << ilen;
307  iError = 1;
308  goto gEndGet;
309  }
310 
311  iHeadPar = ntohl(sInfo.iHeadPar);
312  iTimeOut = ntohl(sInfo.iTimeOut);
313  if (iDebug == 1) {
314  LOG(debug) << "-D- info buffer received:";
315  LOG(debug) << " size data " << ntohl(sInfo.iSize) << ", mode (1) " << ntohl(sInfo.iMode)
316  << ", header parms " << iHeadPar << ", timeout " << iTimeOut;
317  }
318 
319  if ((ntohl(sInfo.iMode) != 1) || (static_cast<int>(ntohl(sInfo.iSize)) != iInfoSize - iint)) {
320  LOG(info) << "-E- invalid info buffer received: ";
321  LOG(info) << " size data ( " << iInfoSize - iint << ") " << ntohl(sInfo.iSize) << ", mode (1) "
322  << ntohl(sInfo.iMode) << ", header parms " << iHeadPar << ", timeout " << iTimeOut;
323  iError = 1;
324  goto gEndGet;
325  }
326 
327  iEvtNo = 0; // initilization done
328  inew = 1; // request new buffer
329 
330  } // (iEvtNo == -1)
331  else {
332  if (iFlush) {
333  inew = 1; // request new buffer
334  if (iDebug == 1) {
335  LOG(debug) << "-D- skip current buffer";
336  }
337  } else {
338  if (iEvtNo >= 0) {
339  // refresh some buffer infos not stored in class data
340  pFrag = reinterpret_cast<sMbsBufFrag*>(&piBuf[3]);
341 
342  // check if new buffer needed
343  ii = 0; // count buffer header
344  if (pFrag->cBuf_fragBegin) {
345  ii++;
346  }
347  if (pFrag->cBuf_fragEnd) {
348  ii++;
349  }
350 
351  if (iEvtRel + ii >= iEvtBuf) {
352  if (iDebug == 1) {
353  LOG(debug) << "-D- request new buffer";
354  }
355  inew = 1;
356  if (iDebug == -1) {
357  piNextEvt += iEvtPar; // skip previous event
358  LOG(debug) << "-D- next 40 2byte-words of buffer:";
359  psNextEvt = reinterpret_cast<short*>(piNextEvt);
360  for (Int_t iii = 0; iii < 40; iii++) {
361  LOG(debug) << " " << iii + 1 << ": " << psNextEvt[iii];
362  }
363  }
364  } else {
365  inew = 0;
366  }
367 
368  } // (iEvtNo > 0)
369  } // (!iFlush)
370  } // (iEvtNo != -1)
371 
372  // request new buffer
373  if (inew) {
374  iEvtRel = 0;
375  iRetry = 0;
376  iRetryFirst = 1;
377  if (imySig == -1) {
378  sComm.iBufRequ = htonl(0); // signal finish to server
379  }
380 
381  // request next buffer or finish
382  ilen = pSocket->SendRaw(piComm, iCommSize, kDefault);
383  if (ilen < 0) {
384  LOG(info) << "-E- sending request for buffer " << iBufNo + 1 << " to server, rc = " << ilen;
385  iError = 1;
386  goto gEndGet;
387  }
388 
389  if (imySig == -1) {
390  goto gEndGet;
391  }
392 
393  if (iDebug == 1)
394  LOG(debug) << "-D- communication buffer sent (request next buffer) ";
395 
396  gRetryLen:
397  // get size of data following
398  piBuf[0] = -1; // enables receive check
399  iSize = iint;
400  pcBuf = reinterpret_cast<char*>(piBuf);
401  while (iSize > 0) {
402  if ((imySig == -1) && (iDebug)) {
403  LOG(debug) << " CTL C detected (before recv len)";
404  }
405  gNextRecvL:
406  iRC = recv(iSocket, pcBuf, iSize, 0);
407  if (iRC < 0) {
408  if (imySig == -1) {
409  if (iDebug) {
410  sprintf(cMsg, "\n-E- receiving data length from server");
411  perror(cMsg);
412  LOG(debug) << " CTL C detected (during recv len)";
413  }
414  goto gNextRecvL;
415  } else { // a real problem
416  sprintf(cMsg, "\n-E- receiving data length from server");
417  perror(cMsg);
418  if (iDebug) {
419  LOG(debug) << " retry";
420  }
421 
422  iRetryRecv++; // count no. of retries to limit them
423  if (iRetryRecv > iRetryRecvLim) { // avoid infinite loop
424  iError = 1;
425  goto gEndGet;
426  } else {
427  goto gNextRecvL;
428  }
429  }
430  }
431  if (iRC == 0) {
432  if ((iDebug == 2) || (iDebug == 3)) {
433  LOG(debug) << "\n";
434  }
435  LOG(info) << "-E- receiving data length: connection closed by server";
436  iError = 1;
437  goto gEndGet;
438  }
439 
440  iRetryRecv = 0;
441  iSize -= iRC;
442  pcBuf += iRC;
443 
444  } /* while(iSize > 0) */
445 
446  if (iDebug == 2) {
447  printf("Rl:");
448  fflush(stdout);
449  }
450 
451  if ((imySig == -1) && (iDebug)) {
452  LOG(debug) << " CTL C detected (after recv len)";
453  }
454 
455  iBufSize = ntohl(piBuf[0]);
456  if (iDebug == 1) {
457  LOG(debug) << " data size received: " << iBufSize;
458  }
459 
460  if (iBufSize <= 0) {
461  if (iBufSize == 0) {
462  if (iDebug) {
463  LOG(debug) << "\n";
464  }
465  LOG(info) << "-W- server closed connection";
466  LOG(info) << " " << iEvtNo << " of " << iEvtMax << " events received";
467  iError = 1;
468  goto gEndGet;
469  }
470 
471  if (iBufSize == -1) {
472  if (iRetryFirst) {
473  LOG(info) << "\n"
474  << "-E- no data length received: ";
475  iRetryFirst = 0;
476  }
477  iRetry++;
478  if (iRetry > iRetryMax) {
479  LOG(info) << iRetryMax << "times";
480  iError = 1;
481  goto gEndGet;
482  }
483  goto gRetryLen;
484  } else {
485  LOG(info) << "\n"
486  << "-E- invalid data length received: " << iBufSize;
487  iError = 1;
488  }
489 
490  goto gEndGet;
491  }
492  if (iRetry) {
493  LOG(info) << iRetry << "times";
494  }
495 
496  // increase data buffer, if necessary
497  if (iBufSize + iint > iBufSizeAlloc) {
498  delete[] piBuf;
499  iBufSizeAlloc = iBufSize + iint;
500  // new total buffer size (including length field)
501  piBuf = new int[iBufSizeAlloc / iint];
502  piBuf[0] = iBufSize;
503  // keep sent buffer size (without length field)
504  if (iDebug == 1)
505  LOG(debug) << "-I- total buffer increased to " << iBufSizeAlloc << " byte";
506  }
507 
508  // get event buffer without length field
509  piBuf[1] = -1; // enables receive check
510  iSize = iBufSize;
511  pcBuf = reinterpret_cast<char*>(&(piBuf[1]));
512  while (iSize > 0) {
513  if ((imySig == -1) && (iDebug)) {
514  LOG(debug) << " CTL C detected (before recv data)";
515  }
516  gNextRecvD:
517  iRC = recv(iSocket, pcBuf, iSize, 0);
518  if (iRC < 0) {
519  if (imySig == -1) {
520  if (iDebug) {
521  sprintf(cMsg, "\n-E- receiving data from server");
522  perror(cMsg);
523  LOG(debug) << " CTL C detected (during recv data)";
524  }
525  goto gNextRecvD;
526  } else { // a real problem
527  sprintf(cMsg, "\n-E- receiving data from server");
528  perror(cMsg);
529 
530  iRetryRecv++; // count no. of retries to limit them
531  if (iRetryRecv > iRetryRecvLim) { // avoid infinite loop
532  iError = 1;
533  goto gEndGet;
534  } else {
535  goto gNextRecvD;
536  }
537  }
538  }
539  if (iRC == 0) {
540  if ((iDebug == 2) || (iDebug == 3)) {
541  LOG(debug) << "\n";
542  }
543  LOG(info) << "-E- receiving data: connection closed by server";
544  iError = 1;
545  goto gEndGet;
546  }
547 
548  iRetryRecv = 0;
549  iSize -= iRC;
550  pcBuf += iRC;
551 
552  } /* while(iSize > 0) */
553 
554  if (iDebug == 2) {
555  printf("Rd:");
556  fflush(stdout);
557  }
558 
559  if (imySig == -1) {
560  if (iDebug) {
561  LOG(debug) << " CTL C detected (after recv data)";
562  }
563  goto gEndGet;
564  }
565 
566  // test for dummy buffer (no DAQ events available)
567  if (iBufSize == (sizeof(sptrevDummy) - sizeof(int))) {
568  iBufNoServ = ntohl(piBuf[1]);
569  iEvtBuf = ntohl(piBuf[2]);
570  if (iEvtBuf == 0) {
571  if (iDebug == 1)
572  printf(" dummy buffer no. %d, %d events\n", iBufNoServ, iEvtBuf);
573  if (iDebug == 3) {
574  LOG(info) << "\n";
575  }
576  LOG(debug) << "*** connection to remote event server okay, but currently no DAQ events (" << iBufNoServ
577  << ")";
578  iStatus = 3;
579  goto gRetryLen;
580  } else {
581  LOG(info) << "-E- invalid event number in dummy buffer no. " << iBufNoServ << ": " << iEvtBuf
582  << " (expected: 0)";
583  iError = 1;
584  goto gEndGet;
585  }
586  }
587 
588  if (!iSwap) {
589  if (piBuf[9] != 1) {
590  iSwap = 1;
591  }
592  }
593 
594  // Long_t lRC;
595 
596  /* swap MBS buffer */
597  if (iSwap) {
598  // lRC = swaplw( &piBuf[1], iBufSize/iint, 0);
599  swaplw(&piBuf[1], iBufSize / iint, 0);
600  if ((iBufNo == 0) && (iDebug)) {
601  LOG(debug) << " Event data swapped";
602  }
603  }
604 
605  iBufNo++;
606  iBufNoServ = piBuf[4];
607  iEvtBuf = piBuf[5]; // no. of events in current buffer
608 
609  if (iEvtNo == 0) {
610  iBufNo = 1; // restart counting
611  iBufNo1 = iBufNoServ; // keep first buffer no.
612  iFragBegin = 0;
613  iFragBeginIgn = 0;
614  iFragEndIgn = 0;
615  iFragConc = 0;
616  }
617  iBufNo2 = iBufNoServ; // keep last buffer no.
618 
619  if (iDebug >= 2) {
620  printf("%d:", iBufNoServ);
621  fflush(stdout);
622  }
623 
624  pFrag = reinterpret_cast<sMbsBufFrag*>(&piBuf[3]);
625  if (iDebug == 1) {
626  LOG(debug) << "\n"
627  << "buffer " << iBufNo << " (" << iBufNoServ << "): "
628  << " size " << iBufSize << " byte";
629  if (pFrag->cBuf_fragBegin) {
630  LOG(debug) << " last event fragment";
631  }
632  if (pFrag->cBuf_fragEnd) {
633  LOG(debug) << " first event fragment";
634  }
635  LOG(debug) << " buffer contains " << iEvtBuf << " elements";
636  }
637 
638  if (pFrag->cBuf_fragEnd) {
639  iFragEndIgn++;
640  if ((iEvtNo > 0) && (iFragBegin)) {
641  iFragConc++;
642  }
643  }
644 
645  if (pFrag->cBuf_fragBegin) {
646  iFragBegin = 1; // keep info for next buffer
647  iFragBeginIgn++;
648  }
649 
650  if (iDebug == -1) {
651  LOG(debug) << "-D- first 50 2byte-words of buffer:";
652  psNextEvt = reinterpret_cast<short*>(&piBuf[1]);
653  for (Int_t iii = 0; iii < 50; iii++) {
654  LOG(debug) << " " << iii + 1 << ": " << psNextEvt[iii];
655  }
656  }
657 
658  iEvtRel = 1; // first event in buffer
659  piNextEvt = piBuf + iHeadPar + 1; // ptr first element in buffer
660  iEvtPar = piNextEvt[0] / 2 + 2; // no. of parameters new event
661 
662  if (pFrag->cBuf_fragEnd) {
663  piNextEvt += iEvtPar; // skip fragment end
664  iEvtPar = piNextEvt[0] / 2 + 2; // no. of parameters new event
665  }
666 
667  } // new buffer
668  else {
669  iEvtRel++; // event no. in buffer
670  piNextEvt += iEvtPar; // skip previous event
671  iEvtPar = piNextEvt[0] / 2 + 2; // no. of parameters new event
672 
673  // also if starting with current buffer: keep first buffer no.
674  if (iEvtNo == 0) {
675  iBufNo = 1; // restart counting
676  iBufNoServ = piBuf[4];
677  iBufNo1 = iBufNoServ;
678 
679  iFragBegin = 0;
680  iFragBeginIgn = 0;
681  iFragEndIgn = 0;
682  iFragConc = 0;
683  pFrag = reinterpret_cast<sMbsBufFrag*>(&piBuf[3]);
684  if (pFrag->cBuf_fragBegin) {
685  iFragBegin = 1; // keep info for next buffer
686  iFragBeginIgn++;
687  }
688  if (pFrag->cBuf_fragEnd) {
689  iFragEndIgn++;
690  }
691 
692  } // (iEvtNo == 0)
693  } // continue with current buffer
694 
695  iEvtNo++; // total event no.
696  psNextEvt = reinterpret_cast<short*>(piNextEvt);
697  pEvtHead = reinterpret_cast<sMbsEv101*>(piNextEvt);
698  ielen = pEvtHead->iMbsEv101_dlen;
699  pSEvtHead = reinterpret_cast<sMbsSev101*>(&piNextEvt[4]);
700  pshort = reinterpret_cast<short*>(pSEvtHead);
701 
702  pEvt->nSubEvt = 0;
703 
704  if ((iDebug == 1) || (iDebug == 5)) {
705  iselen1 = pSEvtHead->iMbsSev101_dlen;
706 
707  pEvt->nSubEvt += 1;
708  pEvt->subEvtSize[0] = pSEvtHead->iMbsSev101_dlen / 2 - 1;
709  pEvt->subEvtType[0] = pSEvtHead->sMbsSev101_type;
710  pEvt->subEvtSubType[0] = pSEvtHead->sMbsSev101_subtype;
711  pEvt->subEvtProcId[0] = pSEvtHead->sMbsSev101_procid;
712  pEvt->subEvtSubCrate[0] = pSEvtHead->cMbsSev101_subcrate;
713  pEvt->subEvtControl[0] = pSEvtHead->cMbsSev101_control;
714  pEvt->pSubEvt[0] = reinterpret_cast<Int_t*>(&pshort[6]);
715 
716  /*
717  std::stringstream ss
718  ss << " evt " << iEvtNo << " (" << piNextEvt[3]
719  << "), len " << pEvtHead->iMbsEv101_dlen
720  << ", type " << pEvtHead->sMbsEv101_type
721  << "." << pEvtHead->sMbsEv101_subtype
722  << ", trigger " << pEvtHead->sMbsEv101_trigger;
723  ss << ", SE1 len " << iselen1
724  << " procid " << pSEvtHead->sMbsSev101_procid;
725 */
726  ielen -= (iselen1 + 8);
727 
728  ii = 1;
729  iselen = iselen1;
730  while (ielen > 0) {
731  ii++;
732  // if (ii > 3) break;
733  pshort += iselen + 4;
734  pSEvtHead = reinterpret_cast<sMbsSev101*>(pshort);
735  iselen = pSEvtHead->iMbsSev101_dlen;
736  // ss << ", SE" << ii << " " << iselen
737  // << " " << pSEvtHead->sMbsSev101_procid;
738  ielen -= (iselen + 4);
739 
740  pEvt->nSubEvt += 1;
741  pEvt->subEvtSize[ii - 1] = pSEvtHead->iMbsSev101_dlen / 2 - 1;
742  pEvt->subEvtType[ii - 1] = pSEvtHead->sMbsSev101_type;
743  pEvt->subEvtSubType[ii - 1] = pSEvtHead->sMbsSev101_subtype;
744  pEvt->subEvtProcId[ii - 1] = pSEvtHead->sMbsSev101_procid;
745  pEvt->subEvtSubCrate[ii - 1] = pSEvtHead->cMbsSev101_subcrate;
746  pEvt->subEvtControl[ii - 1] = pSEvtHead->cMbsSev101_control;
747  pEvt->pSubEvt[ii - 1] = reinterpret_cast<Int_t*>(&pshort[6]);
748  }
749  // LOG(debug) << ss.str();
750  }
751 
752  // fill event header
753  iHead[0] = piNextEvt[0]; // event length
754  iHead[1] = piNextEvt[3]; // event number
755  pEvt->ReFillHead(iHead);
756 
757  // fill event data
758  pEvt->ReFillData(piNextEvt);
759 
760  if (imySig == -1) {
761  LOG(info) << "\n"
762  << "-D- CTL C specified";
763  if (iDebug) {
764  LOG(debug) << " (at end RevGet)";
765  } else {
766  LOG(info) << "\n";
767  }
768  goto gEndGet;
769  }
770 
771  if (iEvtNo == iEvtMax) {
772  LOG(info) << "\n"
773  << "-I- all required events (" << iEvtMax << ") received: " << iBufNo << " buffers (" << iBufNo1
774  << " - " << iBufNo2 << ")";
775  LOG(info) << " fragments found: " << iFragBeginIgn << " begin, " << iFragEndIgn << " end";
776  if (iFragConc)
777  LOG(info) << " " << iFragConc << " events not concatenated from fragments";
778  }
779 
780  iStatus = 0; // last event request successfull
781  return (pEvt);
782 
783 gEndGet:
784  if ((iError) || (imySig == -1)) {
785  if (iDebug) {
786  LOG(debug) << " RevGet: closing connection to server";
787  }
788  iRC = rclose(&iSocket, 2);
789  if ((iDebug) && (iRC == 0)) {
790  LOG(debug) << " - done";
791  }
792 
793  if (imySig == -1) {
794  iStatus = 5;
795  } // user break (CTL C)
796  else {
797  iStatus = 6;
798  } // failure
799  imySig = 0; // notify CTL C handler
800  } else if (iDebug == 1) {
801  LOG(debug) << " RevGet: keeping connection to server";
802  }
803 
804  return 0;
805 
806 } // RevGet
807 
808 Int_t MRevBuffer::RevBufsize() { return iBufSize; } // RevBufsize
809 
810 void MRevBuffer::RevBufWait(Int_t iWait)
811 {
812  if (iWait > 0) {
813  sleep(iWait);
814  }
815 } // RevBufWait
816 
817 Int_t MRevBuffer::RevStatus(Int_t iOut)
818 {
819  if (iOut)
820  switch (iStatus) {
821  case 0:
822  LOG(info) << "-I- *** Last request for events was successfull";
823  break;
824  case 1:
825  LOG(info) << "-I- *** Remote event server not yet connected";
826  break;
827  case 2:
828  LOG(info) << "-I- *** Remote event server connected, but still no request for events";
829  break;
830  case 3:
831  LOG(info) << "-I- *** Connection to remote event server okay, but currently no DAQ events";
832  break;
833  case 4:
834  LOG(info) << "-I- *** Connection to remote event server closed";
835  break;
836  case 5:
837  LOG(info) << "-I- *** Connection to remote event server closed after user break (CTL C)";
838  break;
839  case 6:
840  LOG(info) << "-I- *** Connection to remote event server closed after failure";
841  break;
842  default:
843  LOG(info) << "-E- Invalid status remote event server found: " << iStatus;
844  }
845  return iStatus;
846 
847 } // RevStatus
848 
849 void MRevBuffer::RevClose(TSocket* pSocket)
850 {
851  int iRC;
852  Int_t* piComm;
853  srevComm sComm;
854  Int_t iCommSize = sizeof(sComm); // size comm. buffer (byte)
855 
856  if (imySig < 0) {
857  return;
858  } // CTL Y: connection closed elsewhere
859  if (iSocket == 0) {
860  return;
861  }
862 
863  // tell server that no more events needed
864  piComm = &(sComm.iSize); // communication buffer
865  sComm.iSize = htonl(iCommSize - sizeof(int)); // size of data following
866  sComm.iMode = htonl(1); // required: get events
867  sComm.iIdent = 1; // required: tell endian type
868  sComm.iBufRequ = htonl(0); // no more event buffers
869 
870  if (iDebug == 1)
871  LOG(debug) << "-D- send close request (data size " << ntohl(sComm.iSize) << " byte): " << ntohl(sComm.iMode)
872  << ", " << ntohl(sComm.iBufRequ);
873 
874  iRC = pSocket->SendRaw(piComm, iCommSize, kDefault);
875  if (iRC < 0)
876  LOG(info) << "-E- sending close request to server, rc = " << iRC;
877  else if (iDebug == 1) {
878  LOG(debug) << " close request sent";
879  }
880 
881  if (iDebug) {
882  LOG(debug) << " RevClose: closing connection to server";
883  }
884  iRC = rclose(&iSocket, 2);
885  if ((iDebug) && (iRC == 0)) {
886  LOG(debug) << " - done";
887  }
888 
889  iStatus = 4; // connection to server closed
890  imySig = 0; // notify CTL C handler
891  LOG(info) << "-I- connection to server closed";
892 
893 } // RevClose
894 
896 
898  : TObject()
899  , iSize(0)
900  , iNumb(0)
901  , piData(nullptr)
902  , nSubEvt(0)
903  , subEvtSize()
904  , subEvtType()
905  , subEvtSubType()
906  , pSubEvt()
907 {
908  // LOG(info) << " REvent() ...";
909  // iNumb = 0;
910  // piData = 0;
911 }
912 
914 {
915  // LOG(info) << " ~REvent() ...";
916 }
917 
918 void REvent::ReFillHead(Int_t* pHead)
919 {
920  iSize = pHead[0]; // event size without header in 2-byte-words
921  iNumb = pHead[1];
922 }
923 
924 void REvent::ReFillData(Int_t* pdata) { piData = pdata; }
925 
926 Int_t REvent::ReGetNumb() { return iNumb; }
927 
928 Int_t REvent::ReGetSize() { return iSize; }
929 
930 Int_t REvent::ReGetData(Int_t iChan)
931 {
932  Int_t iValue;
933  Int_t* pint;
934 
935  // if ( (iChan < 1) || (iChan > iSize/( (signed) sizeof(int))) )
936  if ((iChan < 1) || (iChan > (iSize + 4) / (static_cast<signed>(sizeof(short))))) {
937  LOG(info) << "-E- event parameter number " << iChan << " out of range ("
938  << (iSize + 4) / (static_cast<signed>(sizeof(short))) << " long words)";
939  return (-1);
940  }
941  pint = piData;
942  iValue = pint[iChan - 1];
943  // LOG(info) << " param " << iChan << ": " << iValue;
944  return iValue;
945 }
Short_t subEvtSubType[100]
Definition: MRevBuffer.h:57
int rclose(int *, int)
Definition: rclose.c:25
int iMode
Definition: ptrevcomm.h:30
Int_t RevStatus(Int_t iOut)
Definition: MRevBuffer.cxx:817
void ReFillData(Int_t *pData)
Definition: MRevBuffer.cxx:924
Int_t subEvtSize[100]
Definition: MRevBuffer.h:55
void exitCli()
Definition: exitCli.c:30
int iSize
Definition: ptrevcomm.h:38
short sMbsSev101_type
Definition: ptrevmbsdef.h:102
int iBufRequ
Definition: ptrevcomm.h:32
Int_t * RevGetI(TSocket *pSocket, Int_t iFlush)
Definition: MRevBuffer.cxx:218
int iIdent
Definition: ptrevcomm.h:31
int iHeadPar
Definition: ptrevcomm.h:41
Short_t subEvtProcId[100]
Definition: MRevBuffer.h:58
void RevClose(TSocket *pSocket)
Definition: MRevBuffer.cxx:849
unsigned char cMbsSev101_subcrate
Definition: ptrevmbsdef.h:105
unsigned char cBuf_fragBegin
Definition: ptrevmbsdef.h:66
Short_t subEvtSubCrate[100]
Definition: MRevBuffer.h:59
int iOutMode
Definition: MRevBuffer.cxx:73
Short_t subEvtType[100]
Definition: MRevBuffer.h:56
int iMode
Definition: ptrevcomm.h:39
int iTimeOut
Definition: ptrevcomm.h:42
ClassImp(FairEventBuilder)
Short_t subEvtControl[100]
Definition: MRevBuffer.h:60
int iMbsSev101_dlen
Definition: ptrevmbsdef.h:101
short sMbsSev101_subtype
Definition: ptrevmbsdef.h:103
REvent * RevGet(TSocket *pSocket, Int_t iFlush, Int_t iSkip)
Definition: MRevBuffer.cxx:233
Int_t nSubEvt
Definition: MRevBuffer.h:54
TSocket * RevOpen(char *pNode, Int_t iPort, Int_t iEvent)
Definition: MRevBuffer.cxx:162
Int_t ReGetData(Int_t ichan)
Definition: MRevBuffer.cxx:930
Int_t RevBufsize()
Definition: MRevBuffer.cxx:808
long swaplw(int *, int, int)
void RevBufWait(Int_t iWait)
Definition: MRevBuffer.cxx:810
Int_t ReGetSize()
Definition: MRevBuffer.cxx:928
unsigned char cMbsSev101_control
Definition: ptrevmbsdef.h:106
Int_t * pSubEvt[100]
Definition: MRevBuffer.h:61
int iMbsEv101_dlen
Definition: ptrevmbsdef.h:74
Int_t ReGetNumb()
Definition: MRevBuffer.cxx:926
int imySig
Definition: MRevBuffer.cxx:72
void ReFillHead(Int_t *pHead)
Definition: MRevBuffer.cxx:918
int iTimeOut
Definition: MRevBuffer.cxx:71
unsigned char cBuf_fragEnd
Definition: ptrevmbsdef.h:65
int iSize
Definition: ptrevcomm.h:29
short sMbsSev101_procid
Definition: ptrevmbsdef.h:104