mevb.c

Go to the documentation of this file.
00001 /********************************************************************\
00002 Name:         mevb.c
00003 Created by:   Pierre-Andre Amaudruz
00004 
00005 Contents:     Main Event builder task.
00006 
00007   $Id: mevb.c 2753 2005-10-07 14:55:31Z ritt $
00008 
00009 \********************************************************************/
00010 
00011 /**dox***************************************************************/
00012 /* @file mevb.c
00013 The Event builder main file 
00014 */
00015 
00016 #include <stdio.h>
00017 #include "midas.h"
00018 #include "mevb.h"
00019 #include "msystem.h"
00020 #include "ybos.h"
00021 
00022 #define SERVER_CACHE_SIZE  100000       /* event cache before buffer */
00023 
00024 #define ODB_UPDATE_TIME      1000       /* 1 seconds for ODB update */
00025 
00026 #define DEFAULT_FE_TIMEOUT  60000       /* 60 seconds for watchdog timeout */
00027 
00028 EBUILDER_SETTINGS ebset;
00029 EBUILDER_CHANNEL ebch[MAX_CHANNELS];
00030 
00031 INT run_state;                     /* STATE_RUNNING, STATE_STOPPED, STATE_PAUSED */
00032 INT run_number;
00033 DWORD last_time;
00034 DWORD actual_time;                 /* current time in seconds since 1970 */
00035 DWORD actual_millitime;            /* current time in milliseconds */
00036 
00037 char host_name[HOST_NAME_LENGTH];
00038 char expt_name[NAME_LENGTH];
00039 char full_frontend_name[256];
00040 char buffer_name[NAME_LENGTH];
00041 INT   nfragment;
00042 char *dest_event;
00043 HNDLE hDB, hKey, hStatKey, hSubkey, hEqKey, hESetKey;
00044 BOOL debug = FALSE, debug1 = FALSE;
00045 
00046 BOOL wheel = FALSE;
00047 char bars[] = "|\\-/";
00048 int i_bar;
00049 BOOL abort_requested = FALSE, stop_requested = TRUE;
00050 DWORD stop_time = 0, request_stop_time = 0;
00051 
00052 INT(*meb_fragment_add) (char *, char *, INT *);
00053 INT handFlush(void);
00054 INT source_booking(void);
00055 INT source_unbooking(void);
00056 INT close_buffers(void);
00057 INT source_scan(INT fmt, EQUIPMENT_INFO *eq_info);
00058 INT eb_mfragment_add(char *pdest, char *psrce, INT * size);
00059 INT eb_yfragment_add(char *pdest, char *psrce, INT * size);
00060 
00061 INT eb_begin_of_run(INT, char *, char *);
00062 INT eb_end_of_run(INT, char *);
00063 INT eb_user(INT, BOOL mismatch, EBUILDER_CHANNEL *, EVENT_HEADER *, void *, INT *);
00064 INT load_fragment(void);
00065 INT scan_fragment(void);
00066 extern char *frontend_name;
00067 extern char *frontend_file_name;
00068 extern BOOL frontend_call_loop;
00069 
00070 extern INT max_event_size;
00071 extern INT max_event_size_frag;
00072 extern INT event_buffer_size;
00073 extern INT display_period;
00074 extern INT ebuilder_init(void);
00075 extern INT ebuilder_exit(void);
00076 extern INT ebuilder_loop(void);
00077 
00078 extern EQUIPMENT equipment[];
00079 extern INT ybos_event_swap(DWORD * pevt);
00080 
00081 #define EQUIPMENT_COMMON_STR "\
00082 Event ID = WORD : 0\n\
00083 Trigger mask = WORD : 0\n\
00084 Buffer = STRING : [32] SYSTEM\n\
00085 Type = INT : 0\n\
00086 Source = INT : 0\n\
00087 Format = STRING : [8] FIXED\n\
00088 Enabled = BOOL : 0\n\
00089 Read on = INT : 0\n\
00090 Period = INT : 0\n\
00091 Event limit = DOUBLE : 0\n\
00092 Num subevents = DWORD : 0\n\
00093 Log history = INT : 0\n\
00094 Frontend host = STRING : [32] \n\
00095 Frontend name = STRING : [32] \n\
00096 Frontend file name = STRING : [256] \n\
00097 "
00098 
00099 #define EQUIPMENT_STATISTICS_STR "\
00100 Events sent = DOUBLE : 0\n\
00101 Events per sec. = DOUBLE : 0\n\
00102 kBytes per sec. = DOUBLE : 0\n\
00103 "
00104 
00105 /********************************************************************/
00106 INT register_equipment(void)
00107 {
00108   INT index, size, status;
00109   char str[256];
00110   EQUIPMENT_INFO *eq_info;
00111   EQUIPMENT_STATS *eq_stats;
00112   HNDLE hKey;
00113 
00114   /* get current ODB run state */
00115   size = sizeof(run_state);
00116   run_state = STATE_STOPPED;
00117   db_get_value(hDB, 0, "/Runinfo/State", &run_state, &size, TID_INT, TRUE);
00118   size = sizeof(run_number);
00119   run_number = 1;
00120   status = db_get_value(hDB, 0, "/Runinfo/Run number", &run_number, &size, TID_INT, TRUE);
00121   assert(status == SUCCESS);
00122 
00123   /* scan EQUIPMENT table from mevb.C */
00124   for (index = 0; equipment[index].name[0]; index++) {
00125     eq_info = &equipment[index].info;
00126     eq_stats = &equipment[index].stats;
00127 
00128     if (eq_info->event_id == 0) {
00129       printf("\nEvent ID 0 for %s not allowed\n", equipment[index].name);
00130       cm_disconnect_experiment();
00131       ss_sleep(5000);
00132       exit(0);
00133     }
00134 
00135     /* init status */
00136     equipment[index].status = EB_SUCCESS;
00137 
00138     sprintf(str, "/Equipment/%s/Common", equipment[index].name);
00139 
00140     /* get last event limit from ODB */
00141     if (eq_info->eq_type != EQ_SLOW) {
00142       db_find_key(hDB, 0, str, &hKey);
00143       size = sizeof(double);
00144       if (hKey)
00145         db_get_value(hDB, hKey, "Event limit", &eq_info->event_limit, &size,
00146         TID_DOUBLE, TRUE);
00147     }
00148 
00149     /* Create common subtree */
00150     status = db_check_record(hDB, 0, str, EQUIPMENT_COMMON_STR, TRUE);
00151     if (status != DB_SUCCESS) {
00152       printf("Cannot check equipment record, status = %d\n", status);
00153       ss_sleep(3000);
00154     }
00155     db_find_key(hDB, 0, str, &hKey);
00156 
00157     if (equal_ustring(eq_info->format, "YBOS"))
00158       equipment[index].format = FORMAT_YBOS;
00159     else if (equal_ustring(eq_info->format, "FIXED"))
00160       equipment[index].format = FORMAT_FIXED;
00161     else    /* default format is MIDAS */
00162       equipment[index].format = FORMAT_MIDAS;
00163 
00164     gethostname(eq_info->frontend_host, sizeof(eq_info->frontend_host));
00165     strcpy(eq_info->frontend_name, full_frontend_name);
00166     strcpy(eq_info->frontend_file_name, frontend_file_name);
00167 
00168     /* set record from equipment[] table in frontend.c */
00169     db_set_record(hDB, hKey, eq_info, sizeof(EQUIPMENT_INFO), 0);
00170 
00171     /* get record once at the start equipment info */
00172     size = sizeof(EQUIPMENT_INFO);
00173     db_get_record(hDB, hKey, eq_info, &size, 0);
00174 
00175     /*---- Create just the key , leave it empty ---------------------------------*/
00176     sprintf(str, "/Equipment/%s/Variables", equipment[index].name);
00177     db_create_key(hDB, 0, str, TID_KEY);
00178     db_find_key(hDB, 0, str, &hKey);
00179     equipment[index].hkey_variables = hKey;
00180 
00181     /*---- Create and initialize statistics tree -------------------*/
00182     sprintf(str, "/Equipment/%s/Statistics", equipment[index].name);
00183 
00184     status = db_check_record(hDB, 0, str, EQUIPMENT_STATISTICS_STR, TRUE);
00185     if (status != DB_SUCCESS) {
00186       printf("Cannot create/check statistics record, error %d\n", status);
00187       ss_sleep(3000);
00188     }
00189 
00190     status = db_find_key(hDB, 0, str, &hKey);
00191     if (status != DB_SUCCESS) {
00192       printf("Cannot find statistics record, error %d\n", status);
00193       ss_sleep(3000);
00194     }
00195 
00196     eq_stats->events_sent = 0;
00197     eq_stats->events_per_sec = 0;
00198     eq_stats->kbytes_per_sec = 0;
00199 
00200     /* open hot link to statistics tree */
00201     status = db_open_record(hDB, hKey, eq_stats, sizeof(EQUIPMENT_STATS)
00202       , MODE_WRITE, NULL, NULL);
00203     if (status != DB_SUCCESS) {
00204       cm_msg(MERROR, "register_equipment", 
00205         "Cannot open statistics record, error %d. Probably other FE is using it",
00206         status);
00207       ss_sleep(3000);
00208     }
00209 
00210     /*---- open event buffer ---------------------------------------*/
00211     if (eq_info->buffer[0]) {
00212       status = bm_open_buffer(eq_info->buffer, EVENT_BUFFER_SIZE,
00213         &equipment[index].buffer_handle);
00214       if (status != BM_SUCCESS && status != BM_CREATED) {
00215         cm_msg(MERROR, "register_equipment",
00216           "Cannot open event buffer. Try to reduce EVENT_BUFFER_SIZE in midas.h \
00217           and rebuild the system.");
00218         return 0;
00219       }
00220 
00221       /* set the default buffer cache size */
00222       bm_set_cache_size(equipment[index].buffer_handle, 0, SERVER_CACHE_SIZE);
00223     } else {
00224       cm_msg(MERROR, "register_equipment", "Destination buffer must be present");
00225       ss_sleep(3000);
00226       exit(0);
00227     }
00228   }
00229   return SUCCESS;
00230 }
00231 
00232 /********************************************************************/
00233 INT load_fragment(void)
00234 {
00235   INT i, size, type;
00236   HNDLE hEqKey, hSubkey;
00237   EQUIPMENT_INFO *eq_info;
00238   KEY  key;
00239   char  buffer[NAME_LENGTH];
00240   char  format[8];
00241 
00242   /* Get equipment pointer, only one eqp for now */
00243   eq_info = &equipment[0].info;
00244 
00245   /* Scan Equipment/Common listing */
00246   if (db_find_key(hDB, 0, "Equipment", &hEqKey) != DB_SUCCESS) {
00247     cm_msg(MINFO, "load_fragment", "Equipment listing not found");
00248     return EB_ERROR;
00249   }
00250 
00251   /* Scan the Equipment list for fragment info collection */
00252   for (i = 0, nfragment=0 ; ; i++) {
00253     db_enum_key(hDB, hEqKey, i, &hSubkey);
00254     if (!hSubkey)
00255       break;
00256     db_get_key(hDB, hSubkey, &key);
00257     if (key.type == TID_KEY) {
00258       /* Equipment name */
00259       if (debug) printf("Equipment name:%s\n", key.name);
00260       /* Check if equipment is EQ_EB */
00261       size = sizeof(INT);
00262       db_get_value(hDB, hSubkey, "common/type", &type, &size, TID_INT, 0);
00263       size = sizeof(buffer);
00264       db_get_value(hDB, hSubkey, "common/Buffer", buffer, &size, TID_STRING, 0);
00265       size = sizeof(format);
00266       db_get_value(hDB, hSubkey, "common/Format", format, &size, TID_STRING, 0);
00267       /* Check if equipment match EB requirements */
00268       if ((type & EQ_EB)
00269         && (strncmp(buffer, buffer_name, strlen(buffer_name)) == 0)
00270         && (strncmp(format, eq_info->format, strlen(format)) == 0)) {
00271           /* match=> fill internal eb structure */
00272           strcpy(ebch[nfragment].format, format);
00273           strcpy(ebch[nfragment].buffer, buffer);
00274           size = sizeof(WORD);
00275           db_get_value(hDB, hSubkey, "common/Trigger Mask", &ebch[nfragment].trigger_mask, &size, TID_WORD, 0);
00276           size = sizeof(WORD);
00277           db_get_value(hDB, hSubkey, "common/Event ID", &ebch[nfragment].event_id, &size, TID_WORD, 0);
00278           nfragment++;
00279         }
00280     }
00281   }
00282 
00283   printf("Found %d fragment matching EB setting\n", nfragment);
00284   /* Point to the Ebuilder settings */
00285   /* Set fragment_add function based on the format */
00286   if (equipment[0].format == FORMAT_MIDAS)
00287     meb_fragment_add = eb_mfragment_add;
00288   else if (equipment[0].format == FORMAT_YBOS)
00289     meb_fragment_add = eb_yfragment_add;
00290   else {
00291     cm_msg(MERROR, "mevb", "Unknown data format :%d", format);
00292     return EB_ERROR;
00293   }
00294 
00295   /* allocate destination event buffer */
00296   dest_event = (char *) malloc(nfragment * (max_event_size + sizeof(EVENT_HEADER)));
00297   memset(dest_event, 0, nfragment * (max_event_size + sizeof(EVENT_HEADER)));
00298   if (dest_event == NULL) {
00299     cm_msg(MERROR, "EBuilder", "%s: Not enough memory for event buffer", full_frontend_name);
00300     return EB_ERROR;
00301   }
00302   return EB_SUCCESS;
00303 }
00304 
00305 /********************************************************************/
00306 INT scan_fragment(void)
00307 {
00308   INT fragn, status;
00309   EQUIPMENT *eq;
00310   EQUIPMENT_INFO *eq_info;
00311 
00312   /* Get equipment pointer, only one eqp for now */
00313   eq_info = &equipment[0].info;
00314 
00315   /* Main event loop */
00316   do {
00317     switch (run_state) {
00318       case STATE_STOPPED:
00319       case STATE_PAUSED:
00320         /* skip the source scan and yield */
00321         status = cm_yield(500);
00322         if (wheel) {
00323           printf("...%c Snoring\r", bars[i_bar++ % 4]);
00324           fflush(stdout);
00325         }
00326         break;
00327       case STATE_RUNNING:
00328         status = source_scan(equipment[0].format, eq_info);
00329         switch (status) {
00330             case BM_ASYNC_RETURN:  // No event found for now, Check for timeout 
00331               for (fragn = 0; fragn < nfragment; fragn++) {
00332                 if (ebch[fragn].timeout > TIMEOUT) {        /* Timeout */
00333                   if (stop_requested) {                     /* Stop */
00334                     if (debug) printf("Stop requested on timeout %d\n", status);
00335                       status = close_buffers();
00336                     break;
00337                   } 
00338                   else {         /* No stop requested  but timeout */
00339                     if (wheel) {
00340                       printf("...%c Timoing on %1.0lf\r", bars[i_bar++ % 4],
00341                         eq->stats.events_sent);
00342                       fflush(stdout);
00343                       status = cm_yield(50);
00344                     }
00345                   }
00346                 }
00347                 //else { /* No timeout loop back */
00348               } // for loop over all fragments
00349               break;
00350             case EB_ERROR:
00351             case EB_USER_ERROR:
00352               abort_requested = TRUE;
00353               if (status == EB_USER_ERROR)
00354                 cm_msg(MTALK, "EBuilder", "%s: Error signaled by user code - stopping run...", full_frontend_name);
00355               else
00356                 cm_msg(MTALK, "EBuilder", "%s: Event mismatch - Stopping run...", full_frontend_name);
00357               if (cm_transition(TR_STOP, 0, NULL, 0, ASYNC, 0) != CM_SUCCESS) {
00358                 cm_msg(MERROR, "EBuilder", "%s: Stop Transition request failed", full_frontend_name);
00359                 return status;
00360               }
00361               if (debug) printf("Stop requested on Error %d\n", status);
00362               status = close_buffers();
00363               return status;
00364               break;
00365             case EB_SUCCESS:
00366             case EB_SKIP:
00367               //   Normal path if event has been assembled
00368               //   No yield in this case.
00369               break;
00370             default:
00371               cm_msg(MERROR, "Source_scan", "unexpected return %d", status);
00372               status = SS_ABORT;
00373               }  // switch scan_source
00374         break;
00375     }
00376     /* EB job done, update statistics if its time */
00377     /* Check if it's time to do statistics job */
00378     if ((actual_millitime = ss_millitime()) - last_time > 1000) {
00379       /* Force event to appear at the destination if Ebuilder is remote */
00380       rpc_flush_event();
00381       /* Force event ot appear at the destination if Ebuilder is local */
00382       bm_flush_cache(equipment[0].buffer_handle, ASYNC);
00383 
00384       status = cm_yield(10);
00385 
00386       eq = &equipment[0];
00387       eq->stats.events_sent += eq->events_sent;
00388       eq->stats.events_per_sec =
00389         eq->events_sent / ((actual_millitime - last_time) / 1000.0);
00390       eq->stats.kbytes_per_sec =
00391         eq->bytes_sent / 1024.0 / ((actual_millitime - last_time) /
00392         1000.0);
00393       eq->bytes_sent = 0;
00394       eq->events_sent = 0;
00395       /* update destination statistics */
00396       db_send_changed_records();
00397       /* Keep track of last ODB update */
00398       last_time = ss_millitime();
00399     }
00400   } while (status != RPC_SHUTDOWN && status != SS_ABORT);
00401 
00402   return status;
00403   }
00404 
00405 /********************************************************************/
00406 INT eb_mfragment_add(char *pdest, char *psrce, INT * size)
00407 {
00408    BANK_HEADER *psbh, *pdbh;
00409    char *psdata, *pddata;
00410    INT bksize;
00411 
00412    /* Condition for new EVENT the data_size should be ZERO */
00413    *size = ((EVENT_HEADER *) pdest)->data_size;
00414 
00415    /* destination pointer */
00416    pddata = pdest + *size + sizeof(EVENT_HEADER);
00417 
00418    if (*size) {
00419       /* NOT the first fragment */
00420 
00421       /* Swap event source if necessary */
00422       psbh = (BANK_HEADER *) (((EVENT_HEADER *) psrce) + 1);
00423       bk_swap(psbh, FALSE);
00424 
00425       /* source pointer */
00426       psbh = (BANK_HEADER *) (((EVENT_HEADER *) psrce) + 1);
00427       psdata = (char *) (psbh + 1);
00428 
00429       /* copy all banks without the bank header */
00430       bksize = psbh->data_size;
00431 
00432       /* copy */
00433       memcpy(pddata, psdata, bksize);
00434 
00435       /* update event size */
00436       ((EVENT_HEADER *) pdest)->data_size += bksize;
00437 
00438       /* update bank size */
00439       pdbh = (BANK_HEADER *) (((EVENT_HEADER *) pdest) + 1);
00440       pdbh->data_size += bksize;
00441 
00442       *size = ((EVENT_HEADER *) pdest)->data_size;
00443    } else {
00444       /* First event without the event header but with the 
00445          bank header as the size is zero */
00446       *size = ((EVENT_HEADER *) psrce)->data_size;
00447 
00448       /* Swap event if necessary */
00449       psbh = (BANK_HEADER *) (((EVENT_HEADER *) psrce) + 1);
00450       bk_swap(psbh, FALSE);
00451 
00452       /* copy first fragment */
00453       memcpy(pddata, psbh, *size);
00454 
00455       /* update destination event size */
00456       ((EVENT_HEADER *) pdest)->data_size = *size;
00457    }
00458    return CM_SUCCESS;
00459 }
00460 
00461 /*--------------------------------------------------------------------*/
00462 INT eb_yfragment_add(char *pdest, char *psrce, INT * size)
00463 {
00464    /* pdest : EVENT_HEADER pointer
00465       psrce : EVENT_HEADER pointer
00466       Keep pbkh for later incrementation
00467     */
00468    char *psdata, *pddata;
00469    DWORD *pslrl, *pdlrl;
00470    INT i4frgsize, i1frgsize, status;
00471 
00472    /* Condition for new EVENT the data_size should be ZERO */
00473    *size = ((EVENT_HEADER *) pdest)->data_size;
00474 
00475    /* destination pointer skip the header as it has been already
00476       composed and the usere may have modified it on purpose (Midas Control) */
00477    pddata = pdest + *size + sizeof(EVENT_HEADER);
00478 
00479    /* the Midas header is present for logger */
00480    if (*size) {                 /* already filled with a fragment */
00481 
00482       /* source pointer: number of DWORD (lrl included) */
00483       pslrl = (DWORD *) (((EVENT_HEADER *) psrce) + 1);
00484 
00485       /* Swap event if necessary */
00486       status = ybos_event_swap(pslrl);
00487 
00488       /* copy done in bytes, do not include LRL */
00489       psdata = (char *) (pslrl + 1);
00490 
00491       /* copy size in I*4 (lrl included, remove it) */
00492       i4frgsize = (*pslrl);
00493       i1frgsize = 4 * i4frgsize;
00494 
00495       /* append fragment */
00496       memcpy(pddata, psdata, i1frgsize);
00497 
00498       /* update Midas header event size */
00499       ((EVENT_HEADER *) pdest)->data_size += i1frgsize;
00500 
00501       /* update LRL size (I*4) */
00502       pdlrl = (DWORD *) (((EVENT_HEADER *) pdest) + 1);
00503       *pdlrl += i4frgsize;
00504 
00505       /* Return event size in bytes */
00506       *size = ((EVENT_HEADER *) pdest)->data_size;
00507    } else {                     /* new destination event */
00508       /* The composed event has already the MIDAS header.
00509          which may have been modified by the user in ebuser.c
00510          Will be stripped by the logger (YBOS).
00511          Copy the first full event ( no EVID suppression )
00512          First event (without the event header) */
00513 
00514       /* source pointer */
00515       pslrl = (DWORD *) (((EVENT_HEADER *) psrce) + 1);
00516 
00517       /* Swap event if necessary */
00518       status = ybos_event_swap(pslrl);
00519 
00520       /* size in byte from the source midas header */
00521       *size = ((EVENT_HEADER *) psrce)->data_size;
00522 
00523       /* copy first fragment */
00524       memcpy(pddata, (char *) pslrl, *size);
00525 
00526       /* update destination Midas header event size */
00527       ((EVENT_HEADER *) pdest)->data_size += *size;
00528 
00529    }
00530    return CM_SUCCESS;
00531 }
00532 
00533 /*--------------------------------------------------------------------*/
00534 INT tr_start(INT rn, char *error)
00535 {
00536   EBUILDER(ebuilder_str);
00537   INT status, size, i;
00538   char   str[128];
00539   KEY    key;
00540   HNDLE  hKey, hEqkey, hEqFRkey;
00541   EQUIPMENT_INFO *eq_info;
00542   
00543 
00544   eq_info = &equipment[0].info;
00545 
00546   /* Get update eq_info from ODB */
00547   sprintf(str, "/Equipment/%s/Common", equipment[0].name);
00548   status = db_find_key(hDB, 0, str, &hKey);
00549   size = sizeof(EQUIPMENT_INFO);
00550   db_get_record(hDB, hKey, eq_info, &size, 0);
00551 
00552   ebset.nfragment = nfragment;
00553 
00554   /* reset serial numbers */
00555   for (i = 0; equipment[i].name[0]; i++) {
00556     equipment[i].serial_number = 1;
00557     equipment[i].subevent_number = 0;
00558     equipment[i].stats.events_sent = 0;
00559     equipment[i].odb_in = equipment[i].odb_out = 0;
00560   }
00561 
00562   /* Get / Set Settings */
00563   sprintf(str, "/Equipment/%s/Settings", equipment[0].name);
00564   if (db_find_key(hDB, 0, str, &hEqkey) != DB_SUCCESS) {
00565     status = db_create_record(hDB, 0, str, strcomb(ebuilder_str));
00566   }
00567 
00568   /* Keep Key on Ebuilder/Settings */
00569   sprintf(str, "/Equipment/%s/Settings", equipment[0].name);
00570   if (db_find_key(hDB, 0, str, &hEqkey) != DB_SUCCESS) {
00571     cm_msg(MINFO, "load_fragment", "/Equipment/%s/Settings not found", equipment[0].name);
00572   }
00573   
00574   /* Update or Create User_field */
00575   size = sizeof(ebset.user_field);
00576   status = db_get_value(hDB, hEqkey, "User Field", ebset.user_field, &size, TID_STRING, TRUE);
00577 
00578   /* Update or Create User_Build */
00579   size = sizeof(ebset.user_build);
00580   status = db_get_value(hDB, hEqkey, "User Build", &ebset.user_build, &size, TID_BOOL, TRUE);
00581 
00582   /* update ODB */
00583   size = sizeof(INT);
00584   status = db_set_value(hDB, hEqkey, "Number of Fragment", &ebset.nfragment, size, 1, TID_INT);  
00585 
00586     /* Create or update the fragment request list */
00587   status = db_find_key(hDB, hEqkey, "Fragment Required", &hEqFRkey);
00588   status = db_get_key (hDB, hEqFRkey, &key);
00589   if (key.num_values != ebset.nfragment) {
00590     cm_msg(MINFO, "mevb", "Number of Fragment mismatch ODB:%d - CUR:%d", key.num_values, ebset.nfragment);
00591     free (ebset.preqfrag);
00592     size = ebset.nfragment*sizeof(BOOL);
00593     ebset.preqfrag = malloc(size);
00594     for (i=0 ; i<ebset.nfragment ; i++)
00595       ebset.preqfrag[i] = TRUE;
00596     status = db_set_value(hDB, hEqkey, "Fragment Required", ebset.preqfrag, size, ebset.nfragment, TID_BOOL);  
00597   } else {  // Take from ODBedit
00598     size = key.total_size;
00599     free (ebset.preqfrag);
00600     ebset.preqfrag = malloc(size);
00601     status = db_get_data(hDB, hEqFRkey, ebset.preqfrag, &size, TID_BOOL);
00602   }
00603   /* Cleanup fragment flags */
00604   free (ebset.received);
00605   ebset.received = malloc(size);
00606   for (i=0 ; i < ebset.nfragment ; i++) 
00607     ebset.received[i] = FALSE;
00608 
00609   /* Call BOR user function */
00610   status = eb_begin_of_run(run_number, ebset.user_field, error);
00611   if (status != EB_SUCCESS) {
00612     cm_msg(MERROR, "eb_prestart", "run start aborted due to eb_begin_of_run (%d)",
00613       status);
00614     return status;
00615   }
00616 
00617   /* Book all fragment */
00618   status = source_booking();
00619   if (status != SUCCESS)
00620     return status;
00621 
00622   if (!eq_info->enabled) {
00623     cm_msg(MINFO,"ebuilder", "Event Builder disabled");
00624     return CM_SUCCESS;
00625   }
00626 
00627   /* local run state */
00628   run_state = STATE_RUNNING;
00629   run_number = rn;
00630   stop_requested = FALSE;
00631   abort_requested = FALSE;
00632   printf("%s-Starting New Run: %d\n", full_frontend_name, rn);
00633 
00634   /* Reset global trigger mask */
00635   return CM_SUCCESS;
00636 }
00637 
00638 /*--------------------------------------------------------------------*/
00639 INT tr_stop(INT rn, char *error)
00640 {
00641    printf("\n%s-Stopping Run: %d detected\n", full_frontend_name, rn);
00642 
00643    /* local stop */
00644    stop_requested = TRUE;
00645 
00646    /* local stop time */
00647    request_stop_time = ss_millitime();
00648    return CM_SUCCESS;
00649 }
00650 
00651 /*--------------------------------------------------------------------*/
00652 void free_event_buffer(INT nfrag)
00653 {
00654    INT i;
00655    for (i = 0; i < nfrag; i++) {
00656       if (ebch[i].pfragment) {
00657          free(ebch[i].pfragment);
00658          ebch[i].pfragment = NULL;
00659       }
00660    }
00661 }
00662 
00663 /*--------------------------------------------------------------------*/
00664 INT handFlush()
00665 {
00666    int i, size, status;
00667    char strout[256];
00668 
00669    /* Do Hand flush until better way to  garantee the input buffer to be empty */
00670    if (debug)
00671      printf("Hand flushing system buffer... \n");
00672    for (i = 0; i < nfragment; i++) {
00673      do {
00674        if (ebset.preqfrag[i]) { 
00675          size = max_event_size;
00676          status = bm_receive_event(ebch[i].hBuf, ebch[i].pfragment, &size, ASYNC);
00677          if (debug1) {
00678            sprintf(strout,
00679              "booking:Hand flush bm_receive_event[%d] hndle:%d stat:%d  Last Ser:%d",
00680              i, ebch[i].hBuf, status,
00681              ((EVENT_HEADER *) ebch[i].pfragment)->serial_number);
00682            printf("%s\n", strout);
00683          }
00684        }
00685      } while (status == BM_SUCCESS);
00686    }
00687 
00688    /* Empty source buffer */
00689    status = bm_empty_buffers();
00690    if (status != BM_SUCCESS)
00691       cm_msg(MERROR, "handFlush", "bm_empty_buffers failure [%d]", status);
00692    run_state = STATE_STOPPED;
00693    return status;
00694 }
00695 
00696 
00697 /*--------------------------------------------------------------------*/
00698 INT source_booking()
00699 {
00700    INT j, i, status, status1, status2;
00701 
00702    if (debug)
00703       printf("Entering booking\n");
00704 
00705    /* Book all the source channels */
00706    for (i = 0; i < nfragment; i++) {
00707       /* Book only the requested event mask */
00708      if (ebset.preqfrag[i]) {
00709          /* Connect channel to source buffer */
00710          status1 = bm_open_buffer(ebch[i].buffer, EVENT_BUFFER_SIZE, &(ebch[i].hBuf));
00711 
00712          if (debug)
00713            printf("bm_open_buffer frag:%d buf:%s handle:%d stat:%d\n",
00714                    i, ebch[i].buffer, ebch[i].hBuf, status1);
00715          /* Register for specified channel event ID and Trigger mask */
00716          status2 =
00717              bm_request_event(ebch[i].hBuf, ebch[i].event_id,
00718                               ebch[i].trigger_mask, GET_ALL, &ebch[i].req_id, NULL);
00719          if (debug)
00720            printf("bm_request_event frag:%d id:%d msk:%d req_id:%d stat:%d\n",
00721                    i, ebch[i].event_id, ebch[i].trigger_mask, ebch[i].req_id, status2);
00722          if (((status1 != BM_SUCCESS) && (status1 != BM_CREATED)) ||
00723              ((status2 != BM_SUCCESS) && (status2 != BM_CREATED))) {
00724             cm_msg(MERROR, "source_booking",
00725                    "Open buffer/event request failure [%d %d %d]", i, status1, status2);
00726             return BM_CONFLICT;
00727          }
00728 
00729          /* allocate local source event buffer */
00730          if (ebch[i].pfragment)
00731             free(ebch[i].pfragment);
00732          ebch[i].pfragment = (char *) malloc(max_event_size + sizeof(EVENT_HEADER));
00733          if (debug)
00734             printf("malloc pevent frag:%d pevent:%p\n", i, ebch[i].pfragment);
00735          if (ebch[i].pfragment == NULL) {
00736             free_event_buffer(nfragment);
00737             cm_msg(MERROR, "source_booking", "Can't allocate space for buffer");
00738             return BM_NO_MEMORY;
00739          }
00740       }
00741    }
00742 
00743    /* Empty source buffer */
00744    status = bm_empty_buffers();
00745    if (status != BM_SUCCESS) {
00746       cm_msg(MERROR, "source_booking", "bm_empty_buffers failure [%d]", status);
00747       return status;
00748    }
00749 
00750    if (debug) {
00751       printf("bm_empty_buffers stat:%d\n", status);
00752       for (j = 0; j < ebset.nfragment; j++) {
00753          printf(" buff:%s", ebch[j].buffer);
00754          printf(" ser#:%d", ebch[j].serial);
00755          printf(" hbuf:%2d", ebch[j].hBuf);
00756          printf(" rqid:%2d", ebch[j].req_id);
00757          printf(" opst:%d", status1);
00758          printf(" rqst:%d", status2);
00759          printf(" evid:%2d", ebch[j].event_id);
00760          printf(" tmsk:0x%4.4x\n", ebch[j].trigger_mask);
00761       }
00762    }
00763 
00764    return SUCCESS;
00765 }
00766 
00767 /*--------------------------------------------------------------------*/
00768 INT source_unbooking()
00769 {
00770    INT i, status;
00771 
00772    /* Skip unbooking if already done */
00773    if (ebch[0].pfragment == NULL)
00774       return EB_SUCCESS;
00775 
00776    /* unbook all source channels */
00777    for (i = 0; i< nfragment; i++) {
00778       bm_empty_buffers();
00779 
00780       /* Remove event ID registration */
00781       status = bm_delete_request(ebch[i].req_id);
00782       if (debug)
00783          printf("unbook: bm_delete_req[%d] req_id:%d stat:%d\n", i, ebch[i].req_id,
00784                 status);
00785 
00786       /* Close source buffer */
00787       status = bm_close_buffer(ebch[i].hBuf);
00788       if (debug)
00789          printf("unbook: bm_close_buffer[%d] hndle:%d stat:%d\n", i, ebch[i].hBuf,
00790                 status);
00791       if (status != BM_SUCCESS) {
00792          cm_msg(MERROR, "source_unbooking", "Close buffer[%d] stat:", i, status);
00793          return status;
00794       }
00795    }
00796 
00797    /* release local event buffer memory */
00798    free_event_buffer(nfragment);
00799 
00800    return EB_SUCCESS;
00801 }
00802 
00803 /*--------------------------------------------------------------------*/
00804 INT close_buffers(void)
00805 {
00806   INT status;
00807   char error[256];
00808   EQUIPMENT *eq;
00809 
00810   eq = &equipment[0];
00811 
00812   /* Flush local destination cache */
00813   bm_flush_cache(equipment[0].buffer_handle, SYNC);
00814   /* Call user function */
00815   eb_end_of_run(run_number, error);
00816   /* Cleanup buffers */
00817   handFlush();
00818   /* Detach all source from midas */
00819   status = source_unbooking();
00820 
00821   /* Compose message */
00822   stop_time = ss_millitime() - request_stop_time;
00823   sprintf(error, "Run %d Stop after %1.0lf events sent DT:%d[ms]",
00824     run_number, eq->stats.events_sent, stop_time);
00825   cm_msg(MINFO, "EBuilder", "%s", error);
00826 
00827   run_state = STATE_STOPPED;
00828   abort_requested = FALSE;
00829   return status;
00830 }
00831 
00832 /********************************************************************/
00833 /**
00834 Scan all the fragment source once per call.
00835 
00836 -# This will retrieve the full midas event not swapped (except the
00837 MIDAS_HEADER) for each fragment if possible. The fragment will
00838 be stored in the channel event pointer.
00839 -# if after a full nfrag path some frag are still not cellected, it
00840 returns with the frag# missing for timeout check.
00841 -# If ALL fragments are present it will check the midas serial#
00842 for a full match across all the fragments.
00843 -# If the serial check fails it returns with "event mismatch"
00844 and will abort the event builder but not stop the run for now.
00845 -# If the serial check is passed, it will call the user_build function
00846 where the destination event is going to be composed.
00847 
00848 @param fmt Fragment format type 
00849 @param eq_info Equipement pointer
00850 @return   EB_NO_MORE_EVENT, EB_COMPOSE_TIMEOUT
00851 if different then SUCCESS (bm_compose, rpc_sent error)
00852 */
00853 INT source_scan(INT fmt, EQUIPMENT_INFO *eq_info)
00854 {
00855    static char bars[] = "|/-\\";
00856    static int i_bar;
00857    static DWORD serial;
00858    DWORD *plrl;
00859    BOOL   complete;
00860    INT i, status, size;
00861    INT act_size;
00862    BOOL found, event_mismatch;
00863    BANK_HEADER *psbh;
00864 
00865    /* Scan all channels at least once */
00866    for (i = 0; i < nfragment; i++) {
00867       /* Check if current channel needs to be received */
00868      if (ebset.preqfrag[i] && !ebset.received[i]) {
00869          /* Get fragment and store it in ebch[i].pfragment */
00870          size = max_event_size;
00871          status = bm_receive_event(ebch[i].hBuf, ebch[i].pfragment, &size, ASYNC);
00872          switch (status) {
00873          case BM_SUCCESS:      /* event received */
00874             /* Mask event */
00875             ebset.received[i] = TRUE;
00876             /* Keep local serial */
00877             ebch[i].serial = ((EVENT_HEADER *) ebch[i].pfragment)->serial_number;
00878 
00879             /* Swap event depending on data format */
00880             switch (fmt) {
00881             case FORMAT_YBOS:
00882                plrl = (DWORD *) (((EVENT_HEADER *) ebch[i].pfragment) + 1);
00883                ybos_event_swap(plrl);
00884                break;
00885             case FORMAT_MIDAS:
00886                psbh = (BANK_HEADER *) (((EVENT_HEADER *) ebch[i].pfragment) + 1);
00887                bk_swap(psbh, FALSE);
00888                break;
00889             }
00890 
00891             if (debug1) {
00892               printf("SUCC: ch:%d ser:%d rec:%d sz:%d\n", i,
00893                       ebch[i].serial, ebset.received[i], size);
00894             }
00895             break;
00896          case BM_ASYNC_RETURN: /* timeout */
00897             ebch[i].timeout++;
00898             if (debug1) {
00899               printf("ASYNC: ch:%d ser:%d rec:%d sz:%d\n", i,
00900                       ebch[i].serial, ebset.received[i], size);
00901             }
00902             break;
00903          default:              /* Error */
00904             cm_msg(MERROR, "event_scan", "bm_receive_event error %d", status);
00905             return status;
00906             break;
00907          }
00908       }    /* next channel */
00909    }
00910 
00911    /* Check if all fragments have been received */
00912    complete = FALSE;
00913    for (i = 0; i < nfragment;i++) {
00914      if (ebset.preqfrag[i] && !ebset.received[i])
00915        break;
00916    }
00917    if (i == nfragment) {
00918      complete = TRUE;
00919      /* Check if serial matches */
00920      found = event_mismatch = FALSE;
00921      /* Check Serial, mark first serial */
00922      for (i = 0; i < nfragment; i++) {
00923          if (ebset.preqfrag[i] && ebset.received[i] && !found) {
00924             serial = ebch[i].serial;
00925             found = TRUE;
00926          } else {
00927             if (ebset.preqfrag[i] && ebset.received[i] && (serial != ebch[i].serial)) {
00928                /* Event mismatch */
00929                event_mismatch = TRUE;
00930             }
00931          }
00932       }
00933 
00934     /* internal action in case of event mismatch */
00935      if (event_mismatch && debug) {
00936        char str[256];
00937        char strsub[128];
00938        strcpy(str, "event mismatch: ");
00939        for (i = 0; i < nfragment; i++) {
00940          sprintf(strsub, "Ser[%d]:%d ", i, ebch[i].serial);
00941          strcat(str, strsub);
00942        }
00943        printf("event serial mismatch %s\n", str);
00944      }
00945 
00946      /* In any case reset destination buffer */
00947      memset(dest_event, 0, sizeof(EVENT_HEADER));
00948      act_size = 0;
00949 
00950      /* Fill reserved header space of destination event with
00951      final header information */
00952      bm_compose_event((EVENT_HEADER *) dest_event, eq_info->event_id, eq_info->trigger_mask,
00953        act_size, ebch[0].serial);
00954 
00955      /* Pass fragments to user with mismatch flag, for final check before assembly */
00956      status = eb_user(nfragment, event_mismatch, ebch
00957        , (EVENT_HEADER *) dest_event,(void *) ((EVENT_HEADER *) dest_event + 1), &act_size);
00958      if (status != EB_SUCCESS) {
00959        if (status == EB_SKIP) {
00960          /* Reset mask and timeouts as if event has been successfully send out */
00961          for (i = 0; i < nfragment; i++) {
00962            ebch[i].timeout = 0;
00963            ebset.received[i] = FALSE;
00964          }
00965        }
00966        return status;        // Event mark as EB_SKIP or EB_ABORT by user
00967      }
00968 
00969      /* Allow bypass of fragment assembly if user did it on its own */
00970      if (!ebset.user_build) {
00971        for (i = 0; i < nfragment; i++) {
00972          if (ebset.preqfrag[i]) {
00973            status = meb_fragment_add(dest_event, ebch[i].pfragment, &act_size);
00974            if (status != EB_SUCCESS) {
00975              cm_msg(MERROR, "source_scan",
00976                "compose fragment:%d current size:%d (%d)", i, act_size, status);
00977              return EB_ERROR;
00978            }
00979          }
00980        }
00981      }
00982 
00983      /* Overall event to be sent */
00984      act_size = ((EVENT_HEADER *) dest_event)->data_size + sizeof(EVENT_HEADER);
00985 
00986      /* Send event and wait for completion */
00987      status = rpc_send_event(equipment[0].buffer_handle, dest_event, act_size, SYNC);
00988      if (status != BM_SUCCESS) {
00989        if (debug)
00990          printf("rpc_send_event returned error %d, event_size %d\n",
00991          status, act_size);
00992        cm_msg(MERROR, "EBuilder", "%s: rpc_send_event returned error %d", full_frontend_name, status);
00993        return EB_ERROR;
00994      }
00995 
00996      /* Keep track of the total byte count */
00997      equipment[0].bytes_sent += act_size;
00998 
00999      /* update destination event count */
01000      equipment[0].events_sent++;
01001 
01002      /* Reset mask and timeouts as even thave been succesfully send */
01003      for (i = 0; i < nfragment; i++) {
01004        ebch[i].timeout = 0;
01005        ebset.received[i] = FALSE;
01006      }
01007    } // all fragment recieved for this event
01008 
01009    return status;
01010 }
01011 
01012 /*--------------------------------------------------------------------*/
01013 int main(unsigned int argc, char **argv)
01014 {
01015   INT status;
01016   unsigned int i;
01017   BOOL daemon = FALSE;
01018   
01019   /* init structure */
01020   memset(&ebch[0], 0, sizeof(ebch));
01021 
01022   /* set default */
01023   cm_get_environment(host_name, sizeof(host_name), expt_name, sizeof(expt_name));
01024 
01025   /* get parameters */
01026   for (i = 1; i < argc; i++) {
01027     if (argv[i][0] == '-' && argv[i][1] == 'd')
01028       debug = TRUE;
01029     else if (argv[i][0] == '-' && argv[i][1] == 'D')
01030       daemon = TRUE;
01031     else if (argv[i][0] == '-' && argv[i][1] == 'w')
01032       wheel = TRUE;
01033     else if (argv[i][0] == '-') {
01034       if (i + 1 >= argc || argv[i + 1][0] == '-')
01035         goto usage;
01036       if (strncmp(argv[i], "-e", 2) == 0)
01037         strcpy(expt_name, argv[++i]);
01038       else if (strncmp(argv[i], "-h", 2) == 0)
01039         strcpy(host_name, argv[++i]);
01040       else if (strncmp(argv[i], "-b", 2) == 0)
01041         strcpy(buffer_name, argv[++i]);
01042     } else {
01043 usage:
01044       printf("usage: mevb [-h <Hostname>] [-e <Experiment>] -b <buffername> [-d debug]\n");
01045       printf("             -w show wheel -D to start as a daemon\n\n");
01046       return 0;
01047     }
01048   }
01049 
01050   printf("Program mevb version 5 started\n\n");
01051   if (daemon) {
01052     printf("Becoming a daemon...\n");
01053     ss_daemon_init(FALSE);
01054   }
01055 
01056   /* Check buffer arg */
01057   if (buffer_name[0] == 0) {
01058     printf("Buffer name must be specified with -b argument\n");
01059     goto exit;
01060   }
01061 
01062   /* Compose frontend name */
01063    strcpy(full_frontend_name, frontend_name);
01064 
01065   /* Connect to experiment */
01066   status = cm_connect_experiment(host_name, expt_name, full_frontend_name, NULL);
01067   if (status != CM_SUCCESS) {
01068     ss_sleep(5000);
01069     goto exit;
01070   }
01071 
01072   if (debug)
01073     cm_set_watchdog_params(TRUE, 0);
01074 
01075   /* Connect to ODB */
01076   status = cm_get_experiment_database(&hDB, &hKey);
01077   if (status != EB_SUCCESS) {
01078     ss_sleep(5000);
01079     goto exit;
01080   }
01081   
01082   /* check if Ebuilder is already running */
01083   status = cm_exist(full_frontend_name, FALSE);
01084   if (status == CM_SUCCESS) {
01085     cm_msg(MERROR, "Ebuilder", "%s running already!.", full_frontend_name);
01086     cm_disconnect_experiment();
01087     goto exit;
01088   }
01089 
01090   if (ebuilder_init() != SUCCESS) {
01091     cm_disconnect_experiment();
01092     /* let user read message before window might close */
01093     ss_sleep(5000);
01094     goto exit;
01095   }
01096 
01097   /* Register single equipment */
01098   status = register_equipment();
01099   if (status != EB_SUCCESS) {
01100     ss_sleep(5000);
01101     goto exit;
01102   }
01103 
01104   /* Load Fragment info */
01105   status = load_fragment();
01106   if (status != EB_SUCCESS) {
01107     ss_sleep(5000);
01108     goto exit;
01109   }
01110 
01111   /* Register transition for reset counters */
01112   if (cm_register_transition(TR_START, tr_start, 400) != CM_SUCCESS)
01113     return status;
01114   if (cm_register_transition(TR_STOP, tr_stop, 600) != CM_SUCCESS)
01115     goto exit;
01116 
01117   /* Scan fragments... will stay in */
01118   status = scan_fragment();
01119   printf("%s-Out of scan_fragment\n", full_frontend_name);
01120 
01121   /* Detach all source from midas */
01122   printf("%s-Unbooking\n", full_frontend_name);
01123   source_unbooking();
01124   
01125   ebuilder_exit();
01126 
01127 exit:
01128   /* Free local memory */
01129   free_event_buffer(ebset.nfragment);
01130 
01131   /* Clean disconnect from midas */
01132   cm_disconnect_experiment();
01133   return 0;
01134 }

Midas DOC Version 1.9.5 ---- PSI Stefan Ritt ----
Contributions: Pierre-Andre Amaudruz - Sergio Ballestrero - Suzannah Daviel - Doxygen - Peter Green - Qing Gu - Greg Hackman - Gertjan Hofman - Paul Knowles - Rudi Meier - Glenn Moloney - Dave Morris - John M O'Donnell - Konstantin Olchanski - Renee Poutissou - Tamsen Schurman - Andreas Suter - Jan M.Wouters - Piotr Adam Zolnierczuk