00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016 #include <stdio.h>
00017 #include "midas.h"
00018 #include "mevb.h"
00019 #include "msystem.h"
00020 #include "ybos.h"
00021
00022 #define SERVER_CACHE_SIZE 100000
00023
00024 #define ODB_UPDATE_TIME 1000
00025
00026 #define DEFAULT_FE_TIMEOUT 60000
00027
00028 EBUILDER_SETTINGS ebset;
00029 EBUILDER_CHANNEL ebch[MAX_CHANNELS];
00030
00031 INT run_state;
00032 INT run_number;
00033 DWORD last_time;
00034 DWORD actual_time;
00035 DWORD actual_millitime;
00036
00037 char host_name[HOST_NAME_LENGTH];
00038 char expt_name[NAME_LENGTH];
00039 char full_frontend_name[256];
00040 char buffer_name[NAME_LENGTH];
00041 INT nfragment;
00042 char *dest_event;
00043 HNDLE hDB, hKey, hStatKey, hSubkey, hEqKey, hESetKey;
00044 BOOL debug = FALSE, debug1 = FALSE;
00045
00046 BOOL wheel = FALSE;
00047 char bars[] = "|\\-/";
00048 int i_bar;
00049 BOOL abort_requested = FALSE, stop_requested = TRUE;
00050 DWORD stop_time = 0, request_stop_time = 0;
00051
00052 INT(*meb_fragment_add) (char *, char *, INT *);
00053 INT handFlush(void);
00054 INT source_booking(void);
00055 INT source_unbooking(void);
00056 INT close_buffers(void);
00057 INT source_scan(INT fmt, EQUIPMENT_INFO *eq_info);
00058 INT eb_mfragment_add(char *pdest, char *psrce, INT * size);
00059 INT eb_yfragment_add(char *pdest, char *psrce, INT * size);
00060
00061 INT eb_begin_of_run(INT, char *, char *);
00062 INT eb_end_of_run(INT, char *);
00063 INT eb_user(INT, BOOL mismatch, EBUILDER_CHANNEL *, EVENT_HEADER *, void *, INT *);
00064 INT load_fragment(void);
00065 INT scan_fragment(void);
00066 extern char *frontend_name;
00067 extern char *frontend_file_name;
00068 extern BOOL frontend_call_loop;
00069
00070 extern INT max_event_size;
00071 extern INT max_event_size_frag;
00072 extern INT event_buffer_size;
00073 extern INT display_period;
00074 extern INT ebuilder_init(void);
00075 extern INT ebuilder_exit(void);
00076 extern INT ebuilder_loop(void);
00077
00078 extern EQUIPMENT equipment[];
00079 extern INT ybos_event_swap(DWORD * pevt);
00080
00081 #define EQUIPMENT_COMMON_STR "\
00082 Event ID = WORD : 0\n\
00083 Trigger mask = WORD : 0\n\
00084 Buffer = STRING : [32] SYSTEM\n\
00085 Type = INT : 0\n\
00086 Source = INT : 0\n\
00087 Format = STRING : [8] FIXED\n\
00088 Enabled = BOOL : 0\n\
00089 Read on = INT : 0\n\
00090 Period = INT : 0\n\
00091 Event limit = DOUBLE : 0\n\
00092 Num subevents = DWORD : 0\n\
00093 Log history = INT : 0\n\
00094 Frontend host = STRING : [32] \n\
00095 Frontend name = STRING : [32] \n\
00096 Frontend file name = STRING : [256] \n\
00097 "
00098
00099 #define EQUIPMENT_STATISTICS_STR "\
00100 Events sent = DOUBLE : 0\n\
00101 Events per sec. = DOUBLE : 0\n\
00102 kBytes per sec. = DOUBLE : 0\n\
00103 "
00104
00105
00106 INT register_equipment(void)
00107 {
00108 INT index, size, status;
00109 char str[256];
00110 EQUIPMENT_INFO *eq_info;
00111 EQUIPMENT_STATS *eq_stats;
00112 HNDLE hKey;
00113
00114
00115 size = sizeof(run_state);
00116 run_state = STATE_STOPPED;
00117 db_get_value(hDB, 0, "/Runinfo/State", &run_state, &size, TID_INT, TRUE);
00118 size = sizeof(run_number);
00119 run_number = 1;
00120 status = db_get_value(hDB, 0, "/Runinfo/Run number", &run_number, &size, TID_INT, TRUE);
00121 assert(status == SUCCESS);
00122
00123
00124 for (index = 0; equipment[index].name[0]; index++) {
00125 eq_info = &equipment[index].info;
00126 eq_stats = &equipment[index].stats;
00127
00128 if (eq_info->event_id == 0) {
00129 printf("\nEvent ID 0 for %s not allowed\n", equipment[index].name);
00130 cm_disconnect_experiment();
00131 ss_sleep(5000);
00132 exit(0);
00133 }
00134
00135
00136 equipment[index].status = EB_SUCCESS;
00137
00138 sprintf(str, "/Equipment/%s/Common", equipment[index].name);
00139
00140
00141 if (eq_info->eq_type != EQ_SLOW) {
00142 db_find_key(hDB, 0, str, &hKey);
00143 size = sizeof(double);
00144 if (hKey)
00145 db_get_value(hDB, hKey, "Event limit", &eq_info->event_limit, &size,
00146 TID_DOUBLE, TRUE);
00147 }
00148
00149
00150 status = db_check_record(hDB, 0, str, EQUIPMENT_COMMON_STR, TRUE);
00151 if (status != DB_SUCCESS) {
00152 printf("Cannot check equipment record, status = %d\n", status);
00153 ss_sleep(3000);
00154 }
00155 db_find_key(hDB, 0, str, &hKey);
00156
00157 if (equal_ustring(eq_info->format, "YBOS"))
00158 equipment[index].format = FORMAT_YBOS;
00159 else if (equal_ustring(eq_info->format, "FIXED"))
00160 equipment[index].format = FORMAT_FIXED;
00161 else
00162 equipment[index].format = FORMAT_MIDAS;
00163
00164 gethostname(eq_info->frontend_host, sizeof(eq_info->frontend_host));
00165 strcpy(eq_info->frontend_name, full_frontend_name);
00166 strcpy(eq_info->frontend_file_name, frontend_file_name);
00167
00168
00169 db_set_record(hDB, hKey, eq_info, sizeof(EQUIPMENT_INFO), 0);
00170
00171
00172 size = sizeof(EQUIPMENT_INFO);
00173 db_get_record(hDB, hKey, eq_info, &size, 0);
00174
00175
00176 sprintf(str, "/Equipment/%s/Variables", equipment[index].name);
00177 db_create_key(hDB, 0, str, TID_KEY);
00178 db_find_key(hDB, 0, str, &hKey);
00179 equipment[index].hkey_variables = hKey;
00180
00181
00182 sprintf(str, "/Equipment/%s/Statistics", equipment[index].name);
00183
00184 status = db_check_record(hDB, 0, str, EQUIPMENT_STATISTICS_STR, TRUE);
00185 if (status != DB_SUCCESS) {
00186 printf("Cannot create/check statistics record, error %d\n", status);
00187 ss_sleep(3000);
00188 }
00189
00190 status = db_find_key(hDB, 0, str, &hKey);
00191 if (status != DB_SUCCESS) {
00192 printf("Cannot find statistics record, error %d\n", status);
00193 ss_sleep(3000);
00194 }
00195
00196 eq_stats->events_sent = 0;
00197 eq_stats->events_per_sec = 0;
00198 eq_stats->kbytes_per_sec = 0;
00199
00200
00201 status = db_open_record(hDB, hKey, eq_stats, sizeof(EQUIPMENT_STATS)
00202 , MODE_WRITE, NULL, NULL);
00203 if (status != DB_SUCCESS) {
00204 cm_msg(MERROR, "register_equipment",
00205 "Cannot open statistics record, error %d. Probably other FE is using it",
00206 status);
00207 ss_sleep(3000);
00208 }
00209
00210
00211 if (eq_info->buffer[0]) {
00212 status = bm_open_buffer(eq_info->buffer, EVENT_BUFFER_SIZE,
00213 &equipment[index].buffer_handle);
00214 if (status != BM_SUCCESS && status != BM_CREATED) {
00215 cm_msg(MERROR, "register_equipment",
00216 "Cannot open event buffer. Try to reduce EVENT_BUFFER_SIZE in midas.h \
00217 and rebuild the system.");
00218 return 0;
00219 }
00220
00221
00222 bm_set_cache_size(equipment[index].buffer_handle, 0, SERVER_CACHE_SIZE);
00223 } else {
00224 cm_msg(MERROR, "register_equipment", "Destination buffer must be present");
00225 ss_sleep(3000);
00226 exit(0);
00227 }
00228 }
00229 return SUCCESS;
00230 }
00231
00232
00233 INT load_fragment(void)
00234 {
00235 INT i, size, type;
00236 HNDLE hEqKey, hSubkey;
00237 EQUIPMENT_INFO *eq_info;
00238 KEY key;
00239 char buffer[NAME_LENGTH];
00240 char format[8];
00241
00242
00243 eq_info = &equipment[0].info;
00244
00245
00246 if (db_find_key(hDB, 0, "Equipment", &hEqKey) != DB_SUCCESS) {
00247 cm_msg(MINFO, "load_fragment", "Equipment listing not found");
00248 return EB_ERROR;
00249 }
00250
00251
00252 for (i = 0, nfragment=0 ; ; i++) {
00253 db_enum_key(hDB, hEqKey, i, &hSubkey);
00254 if (!hSubkey)
00255 break;
00256 db_get_key(hDB, hSubkey, &key);
00257 if (key.type == TID_KEY) {
00258
00259 if (debug) printf("Equipment name:%s\n", key.name);
00260
00261 size = sizeof(INT);
00262 db_get_value(hDB, hSubkey, "common/type", &type, &size, TID_INT, 0);
00263 size = sizeof(buffer);
00264 db_get_value(hDB, hSubkey, "common/Buffer", buffer, &size, TID_STRING, 0);
00265 size = sizeof(format);
00266 db_get_value(hDB, hSubkey, "common/Format", format, &size, TID_STRING, 0);
00267
00268 if ((type & EQ_EB)
00269 && (strncmp(buffer, buffer_name, strlen(buffer_name)) == 0)
00270 && (strncmp(format, eq_info->format, strlen(format)) == 0)) {
00271
00272 strcpy(ebch[nfragment].format, format);
00273 strcpy(ebch[nfragment].buffer, buffer);
00274 size = sizeof(WORD);
00275 db_get_value(hDB, hSubkey, "common/Trigger Mask", &ebch[nfragment].trigger_mask, &size, TID_WORD, 0);
00276 size = sizeof(WORD);
00277 db_get_value(hDB, hSubkey, "common/Event ID", &ebch[nfragment].event_id, &size, TID_WORD, 0);
00278 nfragment++;
00279 }
00280 }
00281 }
00282
00283 printf("Found %d fragment matching EB setting\n", nfragment);
00284
00285
00286 if (equipment[0].format == FORMAT_MIDAS)
00287 meb_fragment_add = eb_mfragment_add;
00288 else if (equipment[0].format == FORMAT_YBOS)
00289 meb_fragment_add = eb_yfragment_add;
00290 else {
00291 cm_msg(MERROR, "mevb", "Unknown data format :%d", format);
00292 return EB_ERROR;
00293 }
00294
00295
00296 dest_event = (char *) malloc(nfragment * (max_event_size + sizeof(EVENT_HEADER)));
00297 memset(dest_event, 0, nfragment * (max_event_size + sizeof(EVENT_HEADER)));
00298 if (dest_event == NULL) {
00299 cm_msg(MERROR, "EBuilder", "%s: Not enough memory for event buffer", full_frontend_name);
00300 return EB_ERROR;
00301 }
00302 return EB_SUCCESS;
00303 }
00304
00305
00306 INT scan_fragment(void)
00307 {
00308 INT fragn, status;
00309 EQUIPMENT *eq;
00310 EQUIPMENT_INFO *eq_info;
00311
00312
00313 eq_info = &equipment[0].info;
00314
00315
00316 do {
00317 switch (run_state) {
00318 case STATE_STOPPED:
00319 case STATE_PAUSED:
00320
00321 status = cm_yield(500);
00322 if (wheel) {
00323 printf("...%c Snoring\r", bars[i_bar++ % 4]);
00324 fflush(stdout);
00325 }
00326 break;
00327 case STATE_RUNNING:
00328 status = source_scan(equipment[0].format, eq_info);
00329 switch (status) {
00330 case BM_ASYNC_RETURN:
00331 for (fragn = 0; fragn < nfragment; fragn++) {
00332 if (ebch[fragn].timeout > TIMEOUT) {
00333 if (stop_requested) {
00334 if (debug) printf("Stop requested on timeout %d\n", status);
00335 status = close_buffers();
00336 break;
00337 }
00338 else {
00339 if (wheel) {
00340 printf("...%c Timoing on %1.0lf\r", bars[i_bar++ % 4],
00341 eq->stats.events_sent);
00342 fflush(stdout);
00343 status = cm_yield(50);
00344 }
00345 }
00346 }
00347
00348 }
00349 break;
00350 case EB_ERROR:
00351 case EB_USER_ERROR:
00352 abort_requested = TRUE;
00353 if (status == EB_USER_ERROR)
00354 cm_msg(MTALK, "EBuilder", "%s: Error signaled by user code - stopping run...", full_frontend_name);
00355 else
00356 cm_msg(MTALK, "EBuilder", "%s: Event mismatch - Stopping run...", full_frontend_name);
00357 if (cm_transition(TR_STOP, 0, NULL, 0, ASYNC, 0) != CM_SUCCESS) {
00358 cm_msg(MERROR, "EBuilder", "%s: Stop Transition request failed", full_frontend_name);
00359 return status;
00360 }
00361 if (debug) printf("Stop requested on Error %d\n", status);
00362 status = close_buffers();
00363 return status;
00364 break;
00365 case EB_SUCCESS:
00366 case EB_SKIP:
00367
00368
00369 break;
00370 default:
00371 cm_msg(MERROR, "Source_scan", "unexpected return %d", status);
00372 status = SS_ABORT;
00373 }
00374 break;
00375 }
00376
00377
00378 if ((actual_millitime = ss_millitime()) - last_time > 1000) {
00379
00380 rpc_flush_event();
00381
00382 bm_flush_cache(equipment[0].buffer_handle, ASYNC);
00383
00384 status = cm_yield(10);
00385
00386 eq = &equipment[0];
00387 eq->stats.events_sent += eq->events_sent;
00388 eq->stats.events_per_sec =
00389 eq->events_sent / ((actual_millitime - last_time) / 1000.0);
00390 eq->stats.kbytes_per_sec =
00391 eq->bytes_sent / 1024.0 / ((actual_millitime - last_time) /
00392 1000.0);
00393 eq->bytes_sent = 0;
00394 eq->events_sent = 0;
00395
00396 db_send_changed_records();
00397
00398 last_time = ss_millitime();
00399 }
00400 } while (status != RPC_SHUTDOWN && status != SS_ABORT);
00401
00402 return status;
00403 }
00404
00405
00406 INT eb_mfragment_add(char *pdest, char *psrce, INT * size)
00407 {
00408 BANK_HEADER *psbh, *pdbh;
00409 char *psdata, *pddata;
00410 INT bksize;
00411
00412
00413 *size = ((EVENT_HEADER *) pdest)->data_size;
00414
00415
00416 pddata = pdest + *size + sizeof(EVENT_HEADER);
00417
00418 if (*size) {
00419
00420
00421
00422 psbh = (BANK_HEADER *) (((EVENT_HEADER *) psrce) + 1);
00423 bk_swap(psbh, FALSE);
00424
00425
00426 psbh = (BANK_HEADER *) (((EVENT_HEADER *) psrce) + 1);
00427 psdata = (char *) (psbh + 1);
00428
00429
00430 bksize = psbh->data_size;
00431
00432
00433 memcpy(pddata, psdata, bksize);
00434
00435
00436 ((EVENT_HEADER *) pdest)->data_size += bksize;
00437
00438
00439 pdbh = (BANK_HEADER *) (((EVENT_HEADER *) pdest) + 1);
00440 pdbh->data_size += bksize;
00441
00442 *size = ((EVENT_HEADER *) pdest)->data_size;
00443 } else {
00444
00445
00446 *size = ((EVENT_HEADER *) psrce)->data_size;
00447
00448
00449 psbh = (BANK_HEADER *) (((EVENT_HEADER *) psrce) + 1);
00450 bk_swap(psbh, FALSE);
00451
00452
00453 memcpy(pddata, psbh, *size);
00454
00455
00456 ((EVENT_HEADER *) pdest)->data_size = *size;
00457 }
00458 return CM_SUCCESS;
00459 }
00460
00461
00462 INT eb_yfragment_add(char *pdest, char *psrce, INT * size)
00463 {
00464
00465
00466
00467
00468 char *psdata, *pddata;
00469 DWORD *pslrl, *pdlrl;
00470 INT i4frgsize, i1frgsize, status;
00471
00472
00473 *size = ((EVENT_HEADER *) pdest)->data_size;
00474
00475
00476
00477 pddata = pdest + *size + sizeof(EVENT_HEADER);
00478
00479
00480 if (*size) {
00481
00482
00483 pslrl = (DWORD *) (((EVENT_HEADER *) psrce) + 1);
00484
00485
00486 status = ybos_event_swap(pslrl);
00487
00488
00489 psdata = (char *) (pslrl + 1);
00490
00491
00492 i4frgsize = (*pslrl);
00493 i1frgsize = 4 * i4frgsize;
00494
00495
00496 memcpy(pddata, psdata, i1frgsize);
00497
00498
00499 ((EVENT_HEADER *) pdest)->data_size += i1frgsize;
00500
00501
00502 pdlrl = (DWORD *) (((EVENT_HEADER *) pdest) + 1);
00503 *pdlrl += i4frgsize;
00504
00505
00506 *size = ((EVENT_HEADER *) pdest)->data_size;
00507 } else {
00508
00509
00510
00511
00512
00513
00514
00515 pslrl = (DWORD *) (((EVENT_HEADER *) psrce) + 1);
00516
00517
00518 status = ybos_event_swap(pslrl);
00519
00520
00521 *size = ((EVENT_HEADER *) psrce)->data_size;
00522
00523
00524 memcpy(pddata, (char *) pslrl, *size);
00525
00526
00527 ((EVENT_HEADER *) pdest)->data_size += *size;
00528
00529 }
00530 return CM_SUCCESS;
00531 }
00532
00533
00534 INT tr_start(INT rn, char *error)
00535 {
00536 EBUILDER(ebuilder_str);
00537 INT status, size, i;
00538 char str[128];
00539 KEY key;
00540 HNDLE hKey, hEqkey, hEqFRkey;
00541 EQUIPMENT_INFO *eq_info;
00542
00543
00544 eq_info = &equipment[0].info;
00545
00546
00547 sprintf(str, "/Equipment/%s/Common", equipment[0].name);
00548 status = db_find_key(hDB, 0, str, &hKey);
00549 size = sizeof(EQUIPMENT_INFO);
00550 db_get_record(hDB, hKey, eq_info, &size, 0);
00551
00552 ebset.nfragment = nfragment;
00553
00554
00555 for (i = 0; equipment[i].name[0]; i++) {
00556 equipment[i].serial_number = 1;
00557 equipment[i].subevent_number = 0;
00558 equipment[i].stats.events_sent = 0;
00559 equipment[i].odb_in = equipment[i].odb_out = 0;
00560 }
00561
00562
00563 sprintf(str, "/Equipment/%s/Settings", equipment[0].name);
00564 if (db_find_key(hDB, 0, str, &hEqkey) != DB_SUCCESS) {
00565 status = db_create_record(hDB, 0, str, strcomb(ebuilder_str));
00566 }
00567
00568
00569 sprintf(str, "/Equipment/%s/Settings", equipment[0].name);
00570 if (db_find_key(hDB, 0, str, &hEqkey) != DB_SUCCESS) {
00571 cm_msg(MINFO, "load_fragment", "/Equipment/%s/Settings not found", equipment[0].name);
00572 }
00573
00574
00575 size = sizeof(ebset.user_field);
00576 status = db_get_value(hDB, hEqkey, "User Field", ebset.user_field, &size, TID_STRING, TRUE);
00577
00578
00579 size = sizeof(ebset.user_build);
00580 status = db_get_value(hDB, hEqkey, "User Build", &ebset.user_build, &size, TID_BOOL, TRUE);
00581
00582
00583 size = sizeof(INT);
00584 status = db_set_value(hDB, hEqkey, "Number of Fragment", &ebset.nfragment, size, 1, TID_INT);
00585
00586
00587 status = db_find_key(hDB, hEqkey, "Fragment Required", &hEqFRkey);
00588 status = db_get_key (hDB, hEqFRkey, &key);
00589 if (key.num_values != ebset.nfragment) {
00590 cm_msg(MINFO, "mevb", "Number of Fragment mismatch ODB:%d - CUR:%d", key.num_values, ebset.nfragment);
00591 free (ebset.preqfrag);
00592 size = ebset.nfragment*sizeof(BOOL);
00593 ebset.preqfrag = malloc(size);
00594 for (i=0 ; i<ebset.nfragment ; i++)
00595 ebset.preqfrag[i] = TRUE;
00596 status = db_set_value(hDB, hEqkey, "Fragment Required", ebset.preqfrag, size, ebset.nfragment, TID_BOOL);
00597 } else {
00598 size = key.total_size;
00599 free (ebset.preqfrag);
00600 ebset.preqfrag = malloc(size);
00601 status = db_get_data(hDB, hEqFRkey, ebset.preqfrag, &size, TID_BOOL);
00602 }
00603
00604 free (ebset.received);
00605 ebset.received = malloc(size);
00606 for (i=0 ; i < ebset.nfragment ; i++)
00607 ebset.received[i] = FALSE;
00608
00609
00610 status = eb_begin_of_run(run_number, ebset.user_field, error);
00611 if (status != EB_SUCCESS) {
00612 cm_msg(MERROR, "eb_prestart", "run start aborted due to eb_begin_of_run (%d)",
00613 status);
00614 return status;
00615 }
00616
00617
00618 status = source_booking();
00619 if (status != SUCCESS)
00620 return status;
00621
00622 if (!eq_info->enabled) {
00623 cm_msg(MINFO,"ebuilder", "Event Builder disabled");
00624 return CM_SUCCESS;
00625 }
00626
00627
00628 run_state = STATE_RUNNING;
00629 run_number = rn;
00630 stop_requested = FALSE;
00631 abort_requested = FALSE;
00632 printf("%s-Starting New Run: %d\n", full_frontend_name, rn);
00633
00634
00635 return CM_SUCCESS;
00636 }
00637
00638
00639 INT tr_stop(INT rn, char *error)
00640 {
00641 printf("\n%s-Stopping Run: %d detected\n", full_frontend_name, rn);
00642
00643
00644 stop_requested = TRUE;
00645
00646
00647 request_stop_time = ss_millitime();
00648 return CM_SUCCESS;
00649 }
00650
00651
00652 void free_event_buffer(INT nfrag)
00653 {
00654 INT i;
00655 for (i = 0; i < nfrag; i++) {
00656 if (ebch[i].pfragment) {
00657 free(ebch[i].pfragment);
00658 ebch[i].pfragment = NULL;
00659 }
00660 }
00661 }
00662
00663
00664 INT handFlush()
00665 {
00666 int i, size, status;
00667 char strout[256];
00668
00669
00670 if (debug)
00671 printf("Hand flushing system buffer... \n");
00672 for (i = 0; i < nfragment; i++) {
00673 do {
00674 if (ebset.preqfrag[i]) {
00675 size = max_event_size;
00676 status = bm_receive_event(ebch[i].hBuf, ebch[i].pfragment, &size, ASYNC);
00677 if (debug1) {
00678 sprintf(strout,
00679 "booking:Hand flush bm_receive_event[%d] hndle:%d stat:%d Last Ser:%d",
00680 i, ebch[i].hBuf, status,
00681 ((EVENT_HEADER *) ebch[i].pfragment)->serial_number);
00682 printf("%s\n", strout);
00683 }
00684 }
00685 } while (status == BM_SUCCESS);
00686 }
00687
00688
00689 status = bm_empty_buffers();
00690 if (status != BM_SUCCESS)
00691 cm_msg(MERROR, "handFlush", "bm_empty_buffers failure [%d]", status);
00692 run_state = STATE_STOPPED;
00693 return status;
00694 }
00695
00696
00697
00698 INT source_booking()
00699 {
00700 INT j, i, status, status1, status2;
00701
00702 if (debug)
00703 printf("Entering booking\n");
00704
00705
00706 for (i = 0; i < nfragment; i++) {
00707
00708 if (ebset.preqfrag[i]) {
00709
00710 status1 = bm_open_buffer(ebch[i].buffer, EVENT_BUFFER_SIZE, &(ebch[i].hBuf));
00711
00712 if (debug)
00713 printf("bm_open_buffer frag:%d buf:%s handle:%d stat:%d\n",
00714 i, ebch[i].buffer, ebch[i].hBuf, status1);
00715
00716 status2 =
00717 bm_request_event(ebch[i].hBuf, ebch[i].event_id,
00718 ebch[i].trigger_mask, GET_ALL, &ebch[i].req_id, NULL);
00719 if (debug)
00720 printf("bm_request_event frag:%d id:%d msk:%d req_id:%d stat:%d\n",
00721 i, ebch[i].event_id, ebch[i].trigger_mask, ebch[i].req_id, status2);
00722 if (((status1 != BM_SUCCESS) && (status1 != BM_CREATED)) ||
00723 ((status2 != BM_SUCCESS) && (status2 != BM_CREATED))) {
00724 cm_msg(MERROR, "source_booking",
00725 "Open buffer/event request failure [%d %d %d]", i, status1, status2);
00726 return BM_CONFLICT;
00727 }
00728
00729
00730 if (ebch[i].pfragment)
00731 free(ebch[i].pfragment);
00732 ebch[i].pfragment = (char *) malloc(max_event_size + sizeof(EVENT_HEADER));
00733 if (debug)
00734 printf("malloc pevent frag:%d pevent:%p\n", i, ebch[i].pfragment);
00735 if (ebch[i].pfragment == NULL) {
00736 free_event_buffer(nfragment);
00737 cm_msg(MERROR, "source_booking", "Can't allocate space for buffer");
00738 return BM_NO_MEMORY;
00739 }
00740 }
00741 }
00742
00743
00744 status = bm_empty_buffers();
00745 if (status != BM_SUCCESS) {
00746 cm_msg(MERROR, "source_booking", "bm_empty_buffers failure [%d]", status);
00747 return status;
00748 }
00749
00750 if (debug) {
00751 printf("bm_empty_buffers stat:%d\n", status);
00752 for (j = 0; j < ebset.nfragment; j++) {
00753 printf(" buff:%s", ebch[j].buffer);
00754 printf(" ser#:%d", ebch[j].serial);
00755 printf(" hbuf:%2d", ebch[j].hBuf);
00756 printf(" rqid:%2d", ebch[j].req_id);
00757 printf(" opst:%d", status1);
00758 printf(" rqst:%d", status2);
00759 printf(" evid:%2d", ebch[j].event_id);
00760 printf(" tmsk:0x%4.4x\n", ebch[j].trigger_mask);
00761 }
00762 }
00763
00764 return SUCCESS;
00765 }
00766
00767
00768 INT source_unbooking()
00769 {
00770 INT i, status;
00771
00772
00773 if (ebch[0].pfragment == NULL)
00774 return EB_SUCCESS;
00775
00776
00777 for (i = 0; i< nfragment; i++) {
00778 bm_empty_buffers();
00779
00780
00781 status = bm_delete_request(ebch[i].req_id);
00782 if (debug)
00783 printf("unbook: bm_delete_req[%d] req_id:%d stat:%d\n", i, ebch[i].req_id,
00784 status);
00785
00786
00787 status = bm_close_buffer(ebch[i].hBuf);
00788 if (debug)
00789 printf("unbook: bm_close_buffer[%d] hndle:%d stat:%d\n", i, ebch[i].hBuf,
00790 status);
00791 if (status != BM_SUCCESS) {
00792 cm_msg(MERROR, "source_unbooking", "Close buffer[%d] stat:", i, status);
00793 return status;
00794 }
00795 }
00796
00797
00798 free_event_buffer(nfragment);
00799
00800 return EB_SUCCESS;
00801 }
00802
00803
00804 INT close_buffers(void)
00805 {
00806 INT status;
00807 char error[256];
00808 EQUIPMENT *eq;
00809
00810 eq = &equipment[0];
00811
00812
00813 bm_flush_cache(equipment[0].buffer_handle, SYNC);
00814
00815 eb_end_of_run(run_number, error);
00816
00817 handFlush();
00818
00819 status = source_unbooking();
00820
00821
00822 stop_time = ss_millitime() - request_stop_time;
00823 sprintf(error, "Run %d Stop after %1.0lf events sent DT:%d[ms]",
00824 run_number, eq->stats.events_sent, stop_time);
00825 cm_msg(MINFO, "EBuilder", "%s", error);
00826
00827 run_state = STATE_STOPPED;
00828 abort_requested = FALSE;
00829 return status;
00830 }
00831
00832
00833
00834
00835
00836
00837
00838
00839
00840
00841
00842
00843
00844
00845
00846
00847
00848
00849
00850
00851
00852
00853 INT source_scan(INT fmt, EQUIPMENT_INFO *eq_info)
00854 {
00855 static char bars[] = "|/-\\";
00856 static int i_bar;
00857 static DWORD serial;
00858 DWORD *plrl;
00859 BOOL complete;
00860 INT i, status, size;
00861 INT act_size;
00862 BOOL found, event_mismatch;
00863 BANK_HEADER *psbh;
00864
00865
00866 for (i = 0; i < nfragment; i++) {
00867
00868 if (ebset.preqfrag[i] && !ebset.received[i]) {
00869
00870 size = max_event_size;
00871 status = bm_receive_event(ebch[i].hBuf, ebch[i].pfragment, &size, ASYNC);
00872 switch (status) {
00873 case BM_SUCCESS:
00874
00875 ebset.received[i] = TRUE;
00876
00877 ebch[i].serial = ((EVENT_HEADER *) ebch[i].pfragment)->serial_number;
00878
00879
00880 switch (fmt) {
00881 case FORMAT_YBOS:
00882 plrl = (DWORD *) (((EVENT_HEADER *) ebch[i].pfragment) + 1);
00883 ybos_event_swap(plrl);
00884 break;
00885 case FORMAT_MIDAS:
00886 psbh = (BANK_HEADER *) (((EVENT_HEADER *) ebch[i].pfragment) + 1);
00887 bk_swap(psbh, FALSE);
00888 break;
00889 }
00890
00891 if (debug1) {
00892 printf("SUCC: ch:%d ser:%d rec:%d sz:%d\n", i,
00893 ebch[i].serial, ebset.received[i], size);
00894 }
00895 break;
00896 case BM_ASYNC_RETURN:
00897 ebch[i].timeout++;
00898 if (debug1) {
00899 printf("ASYNC: ch:%d ser:%d rec:%d sz:%d\n", i,
00900 ebch[i].serial, ebset.received[i], size);
00901 }
00902 break;
00903 default:
00904 cm_msg(MERROR, "event_scan", "bm_receive_event error %d", status);
00905 return status;
00906 break;
00907 }
00908 }
00909 }
00910
00911
00912 complete = FALSE;
00913 for (i = 0; i < nfragment;i++) {
00914 if (ebset.preqfrag[i] && !ebset.received[i])
00915 break;
00916 }
00917 if (i == nfragment) {
00918 complete = TRUE;
00919
00920 found = event_mismatch = FALSE;
00921
00922 for (i = 0; i < nfragment; i++) {
00923 if (ebset.preqfrag[i] && ebset.received[i] && !found) {
00924 serial = ebch[i].serial;
00925 found = TRUE;
00926 } else {
00927 if (ebset.preqfrag[i] && ebset.received[i] && (serial != ebch[i].serial)) {
00928
00929 event_mismatch = TRUE;
00930 }
00931 }
00932 }
00933
00934
00935 if (event_mismatch && debug) {
00936 char str[256];
00937 char strsub[128];
00938 strcpy(str, "event mismatch: ");
00939 for (i = 0; i < nfragment; i++) {
00940 sprintf(strsub, "Ser[%d]:%d ", i, ebch[i].serial);
00941 strcat(str, strsub);
00942 }
00943 printf("event serial mismatch %s\n", str);
00944 }
00945
00946
00947 memset(dest_event, 0, sizeof(EVENT_HEADER));
00948 act_size = 0;
00949
00950
00951
00952 bm_compose_event((EVENT_HEADER *) dest_event, eq_info->event_id, eq_info->trigger_mask,
00953 act_size, ebch[0].serial);
00954
00955
00956 status = eb_user(nfragment, event_mismatch, ebch
00957 , (EVENT_HEADER *) dest_event,(void *) ((EVENT_HEADER *) dest_event + 1), &act_size);
00958 if (status != EB_SUCCESS) {
00959 if (status == EB_SKIP) {
00960
00961 for (i = 0; i < nfragment; i++) {
00962 ebch[i].timeout = 0;
00963 ebset.received[i] = FALSE;
00964 }
00965 }
00966 return status;
00967 }
00968
00969
00970 if (!ebset.user_build) {
00971 for (i = 0; i < nfragment; i++) {
00972 if (ebset.preqfrag[i]) {
00973 status = meb_fragment_add(dest_event, ebch[i].pfragment, &act_size);
00974 if (status != EB_SUCCESS) {
00975 cm_msg(MERROR, "source_scan",
00976 "compose fragment:%d current size:%d (%d)", i, act_size, status);
00977 return EB_ERROR;
00978 }
00979 }
00980 }
00981 }
00982
00983
00984 act_size = ((EVENT_HEADER *) dest_event)->data_size + sizeof(EVENT_HEADER);
00985
00986
00987 status = rpc_send_event(equipment[0].buffer_handle, dest_event, act_size, SYNC);
00988 if (status != BM_SUCCESS) {
00989 if (debug)
00990 printf("rpc_send_event returned error %d, event_size %d\n",
00991 status, act_size);
00992 cm_msg(MERROR, "EBuilder", "%s: rpc_send_event returned error %d", full_frontend_name, status);
00993 return EB_ERROR;
00994 }
00995
00996
00997 equipment[0].bytes_sent += act_size;
00998
00999
01000 equipment[0].events_sent++;
01001
01002
01003 for (i = 0; i < nfragment; i++) {
01004 ebch[i].timeout = 0;
01005 ebset.received[i] = FALSE;
01006 }
01007 }
01008
01009 return status;
01010 }
01011
01012
01013 int main(unsigned int argc, char **argv)
01014 {
01015 INT status;
01016 unsigned int i;
01017 BOOL daemon = FALSE;
01018
01019
01020 memset(&ebch[0], 0, sizeof(ebch));
01021
01022
01023 cm_get_environment(host_name, sizeof(host_name), expt_name, sizeof(expt_name));
01024
01025
01026 for (i = 1; i < argc; i++) {
01027 if (argv[i][0] == '-' && argv[i][1] == 'd')
01028 debug = TRUE;
01029 else if (argv[i][0] == '-' && argv[i][1] == 'D')
01030 daemon = TRUE;
01031 else if (argv[i][0] == '-' && argv[i][1] == 'w')
01032 wheel = TRUE;
01033 else if (argv[i][0] == '-') {
01034 if (i + 1 >= argc || argv[i + 1][0] == '-')
01035 goto usage;
01036 if (strncmp(argv[i], "-e", 2) == 0)
01037 strcpy(expt_name, argv[++i]);
01038 else if (strncmp(argv[i], "-h", 2) == 0)
01039 strcpy(host_name, argv[++i]);
01040 else if (strncmp(argv[i], "-b", 2) == 0)
01041 strcpy(buffer_name, argv[++i]);
01042 } else {
01043 usage:
01044 printf("usage: mevb [-h <Hostname>] [-e <Experiment>] -b <buffername> [-d debug]\n");
01045 printf(" -w show wheel -D to start as a daemon\n\n");
01046 return 0;
01047 }
01048 }
01049
01050 printf("Program mevb version 5 started\n\n");
01051 if (daemon) {
01052 printf("Becoming a daemon...\n");
01053 ss_daemon_init(FALSE);
01054 }
01055
01056
01057 if (buffer_name[0] == 0) {
01058 printf("Buffer name must be specified with -b argument\n");
01059 goto exit;
01060 }
01061
01062
01063 strcpy(full_frontend_name, frontend_name);
01064
01065
01066 status = cm_connect_experiment(host_name, expt_name, full_frontend_name, NULL);
01067 if (status != CM_SUCCESS) {
01068 ss_sleep(5000);
01069 goto exit;
01070 }
01071
01072 if (debug)
01073 cm_set_watchdog_params(TRUE, 0);
01074
01075
01076 status = cm_get_experiment_database(&hDB, &hKey);
01077 if (status != EB_SUCCESS) {
01078 ss_sleep(5000);
01079 goto exit;
01080 }
01081
01082
01083 status = cm_exist(full_frontend_name, FALSE);
01084 if (status == CM_SUCCESS) {
01085 cm_msg(MERROR, "Ebuilder", "%s running already!.", full_frontend_name);
01086 cm_disconnect_experiment();
01087 goto exit;
01088 }
01089
01090 if (ebuilder_init() != SUCCESS) {
01091 cm_disconnect_experiment();
01092
01093 ss_sleep(5000);
01094 goto exit;
01095 }
01096
01097
01098 status = register_equipment();
01099 if (status != EB_SUCCESS) {
01100 ss_sleep(5000);
01101 goto exit;
01102 }
01103
01104
01105 status = load_fragment();
01106 if (status != EB_SUCCESS) {
01107 ss_sleep(5000);
01108 goto exit;
01109 }
01110
01111
01112 if (cm_register_transition(TR_START, tr_start, 400) != CM_SUCCESS)
01113 return status;
01114 if (cm_register_transition(TR_STOP, tr_stop, 600) != CM_SUCCESS)
01115 goto exit;
01116
01117
01118 status = scan_fragment();
01119 printf("%s-Out of scan_fragment\n", full_frontend_name);
01120
01121
01122 printf("%s-Unbooking\n", full_frontend_name);
01123 source_unbooking();
01124
01125 ebuilder_exit();
01126
01127 exit:
01128
01129 free_event_buffer(ebset.nfragment);
01130
01131
01132 cm_disconnect_experiment();
01133 return 0;
01134 }