AFHBA404
AFHBA404 connects ACQ2106 to PCI-Express
rtm-t-stream-disk.cpp
Go to the documentation of this file.
1 /* ------------------------------------------------------------------------- */
2 /* rtm-t-stream-disk.cpp RTM-T PCIe Host Side test app */
3 /* ------------------------------------------------------------------------- */
4 /* Copyright (C) 2010 Peter Milne, D-TACQ Solutions Ltd
5  * <Peter dot Milne at D hyphen TACQ dot com>
6 
7  This program is free software; you can redistribute it and/or modify
8  it under the terms of Version 2 of the GNU General Public License
9  as published by the Free Software Foundation;
10 
11  This program is distributed in the hope that it will be useful,
12  but WITHOUT ANY WARRANTY; without even the implied warranty of
13  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14  GNU General Public License for more details.
15 
16  You should have received a copy of the GNU General Public License
17  along with this program; if not, write to the Free Software
18  Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. */
19 /* ------------------------------------------------------------------------- */
20 
21 
29 #include <stdio.h>
30 
31 #include <errno.h>
32 #include <stdlib.h>
33 #include <sys/ioctl.h>
34 #include <sys/mman.h>
35 #include <sys/types.h>
36 #include <sys/stat.h>
37 #include <sys/time.h>
38 
39 
40 
41 #include <sched.h>
42 
43 //using namespace std;
44 
45 #include "RTM_T_Device.h"
46 #include "local.h"
47 #include "popt.h"
48 
49 #include "rtm-t_ioctl.h"
50 /* default implementation is NULL */
51 #include "InlineDataHandler.h"
52 
53 
54 
55 #define DIAG(args...)
56 //#define DIAG(args...) fprintf(stderr, args)
57 
58 /* default: never completes */
59 int MAXITER = 0xffffffff;
60 int MAXITER_MASK = 0x7fffffff;
61 
62 
64 int USLEEP = 0;
65 int VERBOSE = 0;
66 int CONCAT = 0;
67 
68 int SSIZE = sizeof(short) * 96;
69 
70 int acq200_debug = 0;
71 
72 const char* OUTROOT = "/mnt";
73 
74 
75 static RTM_T_Device *dev;
76 
77 int CYCLE;
78 int RECYCLE = 0; /* accumulate by default */
79 int NO_OVERWRITE = 0; /* refuse to allow buffer overwrite */
80 #define WRITE_LEN_ALL -1
81 int WRITE_LEN = WRITE_LEN_ALL; /* don't write data at all .. */
82 int MAXINT = 999999;
83 int PUT_DATA = 1; /* output name of data buffer, not id */
84 int NBUFS = 0; /* !=) ? stop after this many buffers */
85 int PUT4KPERFILE = 0; /* fake 4MB output for pv to measure at /1000 */
86 int OUTPUT_META = 0;
87 
88 struct SEQ {
89  unsigned long errors;
90  unsigned long buffers;
91 }
92  SEQ;
93 
94 /* number of buffers to transfer - set on command line or use module knob. */
96 
97 static double htime(void){
98  static struct timeval t0;
99  static int t0_valid;
100  struct timeval t1, td;
101  struct timeval carry = { 0, 0 };
102 
103  if (!t0_valid){
104  gettimeofday(&t0, 0);
105  t0_valid = 1;
106  }
107 
108  gettimeofday(&t1, 0);
109 
110  if (t1.tv_usec < t0.tv_usec){
111  carry.tv_usec = 1000000;
112  carry.tv_sec = 1;
113  }
114  td.tv_usec = t1.tv_usec + carry.tv_usec - t0.tv_usec;
115  td.tv_sec = t1.tv_sec - carry.tv_sec - t0.tv_sec;
116 
117  double tds = td.tv_sec + (double)(td.tv_usec)/1000000;
118  return tds;
119 }
120 
121 static int write_meta(int fd, int ibuf, int nbuf)
122 {
123  unsigned long long nsamples = (unsigned long long)nbuf * dev->maxlen / SSIZE;
124 
125  char buf[128];
126  snprintf(buf, 128,
127  "IBUF=%d\n" "NBUF=%d\n" "NSAMPLES=%llu\n" "HTIME=%.3f\n",
128  ibuf, nbuf, nsamples, htime());
129  return write(fd, buf, strlen(buf));
130 }
131 
132 void fail_if_exists(char *buf)
133 {
134  struct stat stat_buf;
135  int rc = stat(buf, &stat_buf);
136  DIAG("stat:rc %d errno %d\n", rc, errno);
137  if (rc == 0){
138  err("OVERRUN: NO_OVERWRITE SET and \"%s\" exists", buf);
139  exit(1);
140  }else if (errno != ENOENT){
141  err("OVERRUN: NO_OVERWRITE SET and \"%s\" exists", buf);
142  perror("buf");
143  exit(errno);
144  }else{
145  ; /* ENOENT - that's good! */
146  }
147 }
148 
149 #define O_MODE (O_WRONLY|O_CREAT|O_TRUNC)
150 #define PERM (S_IRUSR|S_IWUSR|S_IRGRP|S_IROTH)
151 
152 int icat;
153 int outfp;
154 
155 #define OUTROOTFMT "%s/%06d/"
156 #define OUTFMT2 "%s/%06d/%d.%02d"
157 #define OUTFMT3 "%s/%06d/%d.%03d"
158 
159 const char* outfmt;
160 
161 char buf4k[4096];
162 
163 
164 int succ(int ib) {
165  ++ib;
166  return ib >= dev->nbuffers? 0: ib;
167 }
168 
169 #define MAXHN 1024
170 const char* hostname()
171 {
172  static char hn[MAXHN];
173  gethostname(hn, MAXHN-1);
174  return hn;
175 }
176 
177 class Histo {
178  unsigned *hg;
179  const int maxbins;
180 public:
181  Histo(int _maxbins = NELEMS) :
182  hg(new unsigned[_maxbins]),
183  maxbins(_maxbins)
184  {
185  for (int ii = 0; ii < maxbins; ++ii){
186  hg[ii] = 0;
187  }
188  }
189  void print(){
190  for (int ii = 0; ii < maxbins; ++ii){
191  fprintf(stderr, "%5d%c", hg[ii], ii+1 >= maxbins? '\n': ',');
192  }
193  }
194  unsigned operator() (unsigned bin){
195  if (bin >= maxbins){
196  bin = maxbins - 1;
197  }
198  return hg[bin]++;
199  }
200 };
201 
202 static void writeSBD(struct StreamBufferDef* sbd, const char* data_fname)
203 {
204  static FILE *fp;
205  static struct StreamBufferDef old;
206 
207  if (fp == 0){
208  char buf[128];
209  sprintf(buf, "%s/err.log", OUTROOT);
210  fp = fopen(buf, "w");
211  if (fp == 0){
212  fprintf(stderr, "ERROR failed to open file \"%s\"\n", buf);
213  exit(1);
214  }
215  old = *sbd;
216  }
217 
218  fprintf(fp, "%s,%08x,%08x,%d,%d,%d,%s\n",
219  data_fname,
220  sbd->ibuf, sbd->esta,
221  sbd->ibuf&IBUF_IBUF,
222  (sbd->ibuf&IBUF_IDX)>>IBUF_IDX_SHL,
223  sbd->esta&IBUF_IBUF,
224  (sbd->esta&ESTA_CRC)-(old.esta&ESTA_CRC)<2? "OK": "ERR");
225 
226  old = *sbd;
227 }
228 
229 static void process(int ibuf, int nbuf, struct StreamBufferDef *sbd){
230  if (VERBOSE == 1){
231  if (ibuf%10 == 0){
232  fprintf(stderr, "%c", ibuf==0? '\r': '.');
233  fflush(stderr);
234  }
235  }else if (VERBOSE == 2){
236  static int cycle;
237  if (ibuf == 0){
238  fprintf(stderr, "\r%06d", cycle++);
239  fflush(stderr);
240  }
241  }else if (VERBOSE > 2){
242  fprintf(stderr, "%02d\n", ibuf);
243  }
244 
245  static char data_fname[80];
246  static char old_fname[80];
247  char buf[80];
248  static int _ibuf = -1;
249 
250  if (_ibuf != -1){
251  if (succ(_ibuf) != ibuf){
252  SEQ.errors++;
253  if (SEQ.errors < 10){
254  if (succ(succ(_ibuf)) == ibuf || succ(ibuf) == _ibuf){
255  fprintf(stderr, "WARNING potential buffer cycle detected %lu/%lu skip %d > %d\n",
256  SEQ.errors, SEQ.buffers, _ibuf, ibuf);
257  }else{
258  fprintf(stderr, "ERROR: buffer %lu/%lu skip %d -> %d\n",
259  SEQ.errors, SEQ.buffers, _ibuf, ibuf);
260  }
261  }
262  }
263  SEQ.buffers++;
264  }
265  _ibuf = ibuf;
266 
267  if (icat == 0){
268  sprintf(buf, OUTROOTFMT, OUTROOT, CYCLE);
269  mkdir(buf, 0777);
270 
271  sprintf(data_fname, outfmt, OUTROOT, CYCLE,
272  dev->getDevnum(), ibuf);
273 
274  outfp = open(data_fname, O_MODE, PERM);
275  if (outfp == -1){
276  perror(buf);
277  _exit(errno);
278  }
279  writeSBD(sbd, data_fname);
280 
281  if (OUTPUT_META){
282  strcpy(buf, data_fname);
283  strcat(buf, ".id");
284  int out_meta = open(buf, O_MODE, PERM);
285  write_meta(out_meta, ibuf, nbuf);
286  close(out_meta);
287  }
288  }
289  if (NO_OVERWRITE){
290  fail_if_exists(data_fname);
291  }
292 
293  if (WRITE_LEN != 0){
294  int len = WRITE_LEN == WRITE_LEN_ALL? dev->maxlen: WRITE_LEN;
295  int rc = write(outfp, dev->getHostBufferMapping(ibuf), len);
296  if (rc != len){
297  perror("write fail");
298  exit(1);
299  }
300  }
301 
302  if (++icat > CONCAT){
303  close(outfp); /* close data last - we monitor this one */
304  if (PUT_DATA){
305  if (strlen(old_fname)){
306  if (PUT4KPERFILE){
307  strcpy(buf4k, old_fname);
308  strcat(buf4k, "\n");
309  fwrite(buf4k, 1, PUT4KPERFILE, stdout);
310  fflush(stdout);
311  }else{
312  puts(old_fname);
313  }
314  }
315  strcpy(old_fname, data_fname);
316  }
317  icat = 0;
318  }
319 
320  if (USLEEP){
321  usleep(USLEEP);
322  }
323 }
324 
326 {
327  if ((sbd->ibuf&IBUF_MAGIC_MASK) != IBUF_MAGIC){
328  fprintf(stderr, "ERROR NOT IBUF_MAGIC %08x %08x\n",
329  sbd->ibuf, sbd->esta);
330  exit(1);
331  }
332  return sbd->ibuf&IBUF_IBUF;
333 }
334 
335 static int stream()
336 {
337  unsigned iter = 0;
338  struct StreamBufferDef sbd[NELEMS];
339  int fp = dev->getDeviceHandle();
340  int ifirst = MAXINT;
341  int nbuf = 0;
342  Histo backlog(16);
344 
345 
346  int rc = ioctl(fp, RTM_T_START_STREAM_MAX, &transfer_buffers);
347  if (rc != 0){
348  perror("ioctl RTM_T_START_STREAM failed");
349  exit(errno);
350  }
351 
352  while (iter < MAXITER){
353  DIAG("CALLING read\n");
354  int nread = read(fp, sbd, NELEMS*SBDSZ);
355  int ibuf;
356 
357  DIAG(" read returned %d\n", nread);
358 
359  if (nread > 0){
360  int nb = nread/SBDSZ;
361  backlog(nb);
362  for (ibuf = 0; ibuf < nb; ++ibuf){
363  int nwrite = sizeof(int);
364  int bufno = getBufNo(sbd+ibuf);
365 
366  if (bufno <= ifirst ){
367  ifirst = bufno;
368  if (RECYCLE == 0){
369  ++CYCLE;
370  }else{
371  if (++CYCLE >= RECYCLE){
372  CYCLE=0;
373  }
374  }
375  }
376 
377  handler->handleBuffer(ibuf, dev->getHostBufferMapping(ibuf), dev->maxlen);
378 
379  DIAG("CALLING process\n");
380  process(bufno, ++nbuf, sbd+ibuf);
381 
382  dbg(2, "write [%d] %d\n", ibuf, bufno);
383 
384  DIAG("CALLING write\n");
385  if (write(fp, &bufno, nwrite) != nwrite ){
386  perror("write failed");
387  return -1;
388  }
389  if (NBUFS && nbuf >= NBUFS){
390  printf("NBUFS %d reached, quitting now\n", NBUFS);
391  goto all_done;
392  }
393  }
394  }else{
395  perror("read error");
396  goto on_error;
397  }
398 
399  iter = ++iter&MAXITER_MASK;
400  }
401 on_error:
402 all_done:
403  fprintf(stderr, "%s rtm-t-stream-disk finish %lu seq errors in %lu buffers\n", hostname(), SEQ.errors, SEQ.buffers);
404  backlog.print();
405  DIAG("all done\n");
406  return 0;
407 }
408 
409 
410 static void run_stop_monitor(char *monitor)
411 {
412  /* actually, this is unnecessary since capture will stop on timeout
413  * however, it could be useful for a rapid exit.
414  * it's anticipated that monitor is an expect script blocking on the
415  * statemon service from ACQ.
416  */
417  int ppid = getpid();
418 
419  if (fork() == 0){
420  char arg1[20];
421  sprintf(arg1, "%d", ppid);
422  info("monitor: %s %s\n", monitor, arg1);
423  int rc = execlp(monitor, basename(monitor), arg1, (char*)NULL);
424 
425  if (rc){
426  perror("execlp failed");
427  }
428  }
429 }
430 
431 void setRtPrio(int prio)
432 {
433  struct sched_param prams = {
434  };
435 
436  prams.sched_priority = prio;
437 
438  int rc = sched_setscheduler(0, SCHED_FIFO, &prams);
439  if (rc != 0){
440  perror("sched_setscheduler()");
441  }
442 }
443 
444 
445 static void init_defaults(int argc, char* argv[])
446 {
447  int devnum = 0;
448 
449  if (getenv("RTM_DEVNUM")){
450  devnum = atol(getenv("RTM_DEVNUM"));
451  }
452  dev = new RTM_T_Device(devnum);
453 
454  outfmt = dev->nbuffers > 99? OUTFMT3: OUTFMT2;
455 
456 
457  if (getenv("RTM_MAXITER")){
458  MAXITER = atol(getenv("RTM_MAXITER"));
459  info("MAXITER set %d\n", MAXITER);
460  }
461  if (getenv("RTM_NELEMS")){
462  NELEMS = atol(getenv("RTM_NELEMS"));
463  info("NELEMS set %d\n", NELEMS);
464  }
465  if (getenv("RTM_DEBUG")){
466  acq200_debug = atol(getenv("RTM_DEBUG"));
467  info("DEBUG set %d\n", acq200_debug);
468  }
469  if (getenv("RTM_USLEEP")){
470  USLEEP = atol(getenv("RTM_USLEEP"));
471  info("USLEEP set %d\n", USLEEP);
472  }
473  if (getenv("RTM_VERBOSE")){
474  VERBOSE = atol(getenv("RTM_VERBOSE"));
475  info("VERBOSE set %d\n", VERBOSE);
476  }
477  if (getenv("RTM_PUT_DATA")){
478  PUT_DATA=atoi(getenv("RTM_PUT_DATA"));
479  }
480  if (getenv("SSIZE")){
481  SSIZE = atoi(getenv("SSIZE"));
482  info("SSIZE set %d\n", SSIZE);
483  }
484  if (getenv("RTPRIO")){
485  setRtPrio(atoi(getenv("RTPRIO")));
486  }
487  if (getenv("OUTROOT")){
488  OUTROOT=getenv("OUTROOT");
489  }
490  if (getenv("NBUFS")){
491  NBUFS=atoi(getenv("NBUFS"));
492  }
493  if (getenv("RECYCLE")){
494  RECYCLE=atol(getenv("RECYCLE"));
495  }
496  if (getenv("WRITE_LEN")){
497  WRITE_LEN = atol(getenv("WRITE_LEN"));
498  }
499  if (getenv("NO_OVERWRITE")){
500  NO_OVERWRITE=atol(getenv("NO_OVERWRITE"));
501  }
502  if (getenv("CONCAT")){
503  CONCAT=atoi(getenv("CONCAT"));
504  }
505  if (getenv("PUT4KPERFILE")){
506  PUT4KPERFILE = dev->maxlen/1024;
507  info("PUT4KPERFILE maxlen %x", dev->maxlen);
508  }
509  if (getenv("OUTPUT_META")){
510  OUTPUT_META = atoi(getenv("OUTPUT_META"));
511  }
512  setvbuf(stdout, 0, _IOLBF, 0);
513 
514  char* monitor;
515  if ((monitor = getenv("KILL_ON_STOP")) != 0){
516  run_stop_monitor(monitor);
517  }
518 
520  if (argc > 1){
521  unsigned tb = strtoul(argv[1], 0, 0);
522  if (tb > 0){
523  transfer_buffers = tb;
524  }else{
525  err("stream-disk [NBUFFERS]");
526  }
527  }
528  info("streaming %u buffers", transfer_buffers);
529 }
530 
531 int main(int argc, char* argv[])
532 {
533  init_defaults(argc, argv);
534  return stream();
535 }
RECYCLE
int RECYCLE
Definition: rtm-t-stream-disk.cpp:78
NO_OVERWRITE
int NO_OVERWRITE
Definition: rtm-t-stream-disk.cpp:79
InlineDataHandler::handleBuffer
virtual void handleBuffer(int ibuf, const void *src, int len)
Definition: InlineDataHandler.h:19
InlineDataHandler.h
SEQ::errors
unsigned long errors
Definition: rtm-t-stream-disk.cpp:89
InlineDataHandler::factory
static InlineDataHandler * factory(RTM_T_Device *ai_dev)
Definition: InlineDataHandler.cpp:20
SBDSZ
#define SBDSZ
Definition: rtm-t_ioctl.h:127
RTM_T_Device::nbuffers
const unsigned nbuffers
Definition: RTM_T_Device.h:45
local.h
PUT_DATA
int PUT_DATA
Definition: rtm-t-stream-disk.cpp:83
RTM_T_Device::getHostBufferMapping
const void * getHostBufferMapping(int ibuf=0)
Definition: RTM_T_Device.h:62
info
#define info(format, arg...)
Definition: local.h:118
acq200_debug
int acq200_debug
Definition: rtm-t-stream-disk.cpp:70
PERM
#define PERM
Definition: rtm-t-stream-disk.cpp:150
IBUF_IBUF
#define IBUF_IBUF
Definition: rtm-t_ioctl.h:125
IBUF_IDX_SHL
#define IBUF_IDX_SHL
Definition: rtm-t_ioctl.h:124
WRITE_LEN
int WRITE_LEN
Definition: rtm-t-stream-disk.cpp:81
MAXITER_MASK
int MAXITER_MASK
Definition: rtm-t-stream-disk.cpp:60
StreamBufferDef::ibuf
u32 ibuf
Definition: rtm-t_ioctl.h:118
SEQ::buffers
unsigned long buffers
Definition: rtm-t-stream-disk.cpp:90
OUTROOT
const char * OUTROOT
Definition: rtm-t-stream-disk.cpp:72
InlineDataHandler
Definition: InlineDataHandler.h:13
outfp
int outfp
Definition: rtm-t-stream-disk.cpp:153
SSIZE
int SSIZE
Definition: rtm-t-stream-disk.cpp:68
OUTFMT2
#define OUTFMT2
Definition: rtm-t-stream-disk.cpp:156
ESTA_CRC
#define ESTA_CRC
Definition: rtm-t_ioctl.h:126
outfmt
const char * outfmt
Definition: rtm-t-stream-disk.cpp:159
RTM_T_Device::getDevnum
int getDevnum(void) const
Definition: RTM_T_Device.h:72
SEQ
Definition: rtm-t-stream-disk.cpp:88
RTM_T_Device::maxlen
const unsigned maxlen
Definition: RTM_T_Device.h:46
Histo::operator()
unsigned operator()(unsigned bin)
Definition: rtm-t-stream-disk.cpp:194
hostname
const char * hostname()
Definition: rtm-t-stream-disk.cpp:170
StreamBufferDef::esta
u32 esta
Definition: rtm-t_ioctl.h:119
RTM_T_START_STREAM_MAX
#define RTM_T_START_STREAM_MAX
ioctl Start High Throughput Streaming specify max buffers.
Definition: rtm-t_ioctl.h:85
RTM_T_Device::getDeviceHandle
const int getDeviceHandle(void)
Definition: RTM_T_Device.h:56
OUTFMT3
#define OUTFMT3
Definition: rtm-t-stream-disk.cpp:157
Histo::print
void print()
Definition: rtm-t-stream-disk.cpp:189
rtm-t_ioctl.h
getBufNo
int getBufNo(StreamBufferDef *sbd)
Definition: rtm-t-stream-disk.cpp:325
NBUFS
int NBUFS
Definition: rtm-t-stream-disk.cpp:84
err
#define err(format, arg...)
Definition: local.h:121
ib
int ib
Definition: InlineDataHandlerMuxAO_LLC.cpp:57
buf4k
char buf4k[4096]
Definition: rtm-t-stream-disk.cpp:161
DIAG
#define DIAG(args...)
Definition: rtm-t-stream-disk.cpp:55
NELEMS
int NELEMS
Definition: rtm-t-stream-disk.cpp:63
RTM_T_Device::MAXBUF
@ MAXBUF
Definition: RTM_T_Device.h:80
fail_if_exists
void fail_if_exists(char *buf)
Definition: rtm-t-stream-disk.cpp:132
IBUF_IDX
#define IBUF_IDX
Definition: rtm-t_ioctl.h:123
MAXITER
int MAXITER
Definition: rtm-t-stream-disk.cpp:59
rtm-t-stream.stream
stream
Definition: rtm-t-stream.py:44
MAXINT
int MAXINT
Definition: rtm-t-stream-disk.cpp:82
USLEEP
int USLEEP
Definition: rtm-t-stream-disk.cpp:64
WRITE_LEN_ALL
#define WRITE_LEN_ALL
Definition: rtm-t-stream-disk.cpp:80
MAXHN
#define MAXHN
Definition: rtm-t-stream-disk.cpp:169
main
int main(int argc, char *argv[])
Definition: rtm-t-stream-disk.cpp:531
setRtPrio
void setRtPrio(int prio)
Definition: rtm-t-stream-disk.cpp:431
Histo
Definition: rtm-t-stream-disk.cpp:177
dbg
#define dbg(lvl, format, arg...)
Definition: local.h:124
icat
int icat
Definition: rtm-t-stream-disk.cpp:152
RTM_T_Device::transfer_buffers
const unsigned transfer_buffers
Definition: RTM_T_Device.h:47
transfer_buffers
unsigned transfer_buffers
Definition: rtm-t-stream-disk.cpp:95
RTM_T_Device.h
VERBOSE
int VERBOSE
Definition: rtm-t-stream-disk.cpp:65
RTM_T_Device
Definition: RTM_T_Device.h:17
CYCLE
int CYCLE
Definition: rtm-t-stream-disk.cpp:77
IBUF_MAGIC_MASK
#define IBUF_MAGIC_MASK
Definition: rtm-t_ioctl.h:122
getenv
int getenv(const char *key, int def, int(*cvt)(const char *key))
Definition: acqproc.cpp:23
StreamBufferDef
Definition: rtm-t_ioctl.h:117
CONCAT
int CONCAT
Definition: rtm-t-stream-disk.cpp:66
G::nsamples
int nsamples
samples to capture (default:2, typ 200000)
Definition: acqproc.cpp:38
IBUF_MAGIC
#define IBUF_MAGIC
Definition: rtm-t_ioctl.h:121
OUTPUT_META
int OUTPUT_META
Definition: rtm-t-stream-disk.cpp:86
O_MODE
#define O_MODE
Definition: rtm-t-stream-disk.cpp:149
OUTROOTFMT
#define OUTROOTFMT
Definition: rtm-t-stream-disk.cpp:155
succ
int succ(int ib)
Definition: rtm-t-stream-disk.cpp:164
SEQ
struct SEQ SEQ
PUT4KPERFILE
int PUT4KPERFILE
Definition: rtm-t-stream-disk.cpp:85
Histo::Histo
Histo(int _maxbins=NELEMS)
Definition: rtm-t-stream-disk.cpp:181