mfe.c

Go to the documentation of this file.
00001 /********************************************************************\
00002 
00003   Name:         mfe.c
00004   Created by:   Stefan Ritt
00005 
00006   Contents:     The system part of the MIDAS frontend. Has to be
00007                 linked with user code to form a complete frontend
00008 
00009   $Id: mfe.c 3197 2006-07-31 19:01:21Z ritt $
00010 
00011 \********************************************************************/
00012 
00013 #include <stdio.h>
00014 #include <assert.h>
00015 #include "midas.h"
00016 #include "msystem.h"
00017 #include "mcstd.h"
00018 
00019 #ifdef YBOS_SUPPORT
00020 #include "ybos.h"
00021 #endif
00022 
00023 /*------------------------------------------------------------------*/
00024 
00025 /* items defined in frontend.c */
00026 
00027 extern char *frontend_name;
00028 extern char *frontend_file_name;
00029 extern BOOL frontend_call_loop;
00030 extern INT max_event_size;
00031 extern INT max_event_size_frag;
00032 extern INT event_buffer_size;
00033 extern INT display_period;
00034 extern INT frontend_init(void);
00035 extern INT frontend_exit(void);
00036 extern INT frontend_loop(void);
00037 extern INT begin_of_run(INT run_number, char *error);
00038 extern INT end_of_run(INT run_number, char *error);
00039 extern INT pause_run(INT run_number, char *error);
00040 extern INT resume_run(INT run_number, char *error);
00041 extern INT poll_event(INT source, INT count, BOOL test);
00042 extern INT interrupt_configure(INT cmd, INT source, POINTER_T adr);
00043 
00044 /*------------------------------------------------------------------*/
00045 
00046 /* globals */
00047 
00048 #undef USE_EVENT_CHANNEL
00049 
00050 #define SERVER_CACHE_SIZE  100000       /* event cache before buffer */
00051 
00052 #define ODB_UPDATE_TIME      1000       /* 1 seconds for ODB update */
00053 
00054 #define DEFAULT_FE_TIMEOUT  60000       /* 60 seconds for watchdog timeout */
00055 
00056 INT run_state;                  /* STATE_RUNNING, STATE_STOPPED, STATE_PAUSED */
00057 INT run_number;
00058 DWORD actual_time;              /* current time in seconds since 1970 */
00059 DWORD actual_millitime;         /* current time in milliseconds */
00060 
00061 char host_name[HOST_NAME_LENGTH];
00062 char exp_name[NAME_LENGTH];
00063 char full_frontend_name[256];
00064 
00065 INT max_bytes_per_sec;
00066 INT optimize = 0;               /* set this to one to opimize TCP buffer size */
00067 INT fe_stop = 0;                /* stop switch for VxWorks */
00068 BOOL debug;                     /* disable watchdog messages from server */
00069 DWORD auto_restart = 0;         /* restart run after event limit reached stop */
00070 INT manual_trigger_event_id = 0;        /* set from the manual_trigger callback */
00071 INT frontend_index = -1;        /* frontend index for event building */
00072 
00073 HNDLE hDB;
00074 
00075 #ifdef YBOS_SUPPORT
00076 struct {
00077    DWORD ybos_type;
00078    DWORD odb_type;
00079    INT tsize;
00080 } id_map[] = {
00081    {
00082    A1_BKTYPE, TID_CHAR, 1}, {
00083    I1_BKTYPE, TID_BYTE, 1}, {
00084    I2_BKTYPE, TID_WORD, 2}, {
00085    I4_BKTYPE, TID_DWORD, 4}, {
00086    F4_BKTYPE, TID_FLOAT, 4}, {
00087    D8_BKTYPE, TID_DOUBLE, 8}, {
00088    0, 0}
00089 };
00090 #endif
00091 
00092 extern EQUIPMENT equipment[];
00093 
00094 EQUIPMENT *interrupt_eq = NULL;
00095 EVENT_HEADER *interrupt_odb_buffer;
00096 BOOL interrupt_odb_buffer_valid;
00097 BOOL slowcont_eq = FALSE;
00098 
00099 int send_event(INT index);
00100 void send_all_periodic_events(INT transition);
00101 void interrupt_routine(void);
00102 void interrupt_enable(BOOL flag);
00103 void display(BOOL bInit);
00104 
00105 /*---- ODB records -------------------------------------------------*/
00106 
00107 #define EQUIPMENT_COMMON_STR "\
00108 Event ID = WORD : 0\n\
00109 Trigger mask = WORD : 0\n\
00110 Buffer = STRING : [32] SYSTEM\n\
00111 Type = INT : 0\n\
00112 Source = INT : 0\n\
00113 Format = STRING : [8] FIXED\n\
00114 Enabled = BOOL : 0\n\
00115 Read on = INT : 0\n\
00116 Period = INT : 0\n\
00117 Event limit = DOUBLE : 0\n\
00118 Num subevents = DWORD : 0\n\
00119 Log history = INT : 0\n\
00120 Frontend host = STRING : [32] \n\
00121 Frontend name = STRING : [32] \n\
00122 Frontend file name = STRING : [256] \n\
00123 "
00124 
00125 #define EQUIPMENT_STATISTICS_STR "\
00126 Events sent = DOUBLE : 0\n\
00127 Events per sec. = DOUBLE : 0\n\
00128 kBytes per sec. = DOUBLE : 0\n\
00129 "
00130 
00131 /*-- transition callbacks ------------------------------------------*/
00132 
00133 /*-- start ---------------------------------------------------------*/
00134 
00135 INT tr_start(INT rn, char *error)
00136 {
00137    INT i, status;
00138 
00139    /* reset serial numbers */
00140    for (i = 0; equipment[i].name[0]; i++) {
00141       equipment[i].serial_number = 1;
00142       equipment[i].subevent_number = 0;
00143       equipment[i].stats.events_sent = 0;
00144       equipment[i].odb_in = equipment[i].odb_out = 0;
00145    }
00146 
00147    status = begin_of_run(rn, error);
00148 
00149    if (status == CM_SUCCESS) {
00150       run_state = STATE_RUNNING;
00151       run_number = rn;
00152 
00153       send_all_periodic_events(TR_START);
00154 
00155       if (display_period) {
00156          ss_printf(14, 2, "Running ");
00157          ss_printf(36, 2, "%d", rn);
00158       }
00159 
00160       /* enable interrupts */
00161       interrupt_enable(TRUE);
00162    }
00163 
00164    return status;
00165 }
00166 
00167 /*-- prestop -------------------------------------------------------*/
00168 
00169 INT tr_stop(INT rn, char *error)
00170 {
00171    INT status, i;
00172    EQUIPMENT *eq;
00173 
00174    /* disable interrupts */
00175    interrupt_enable(FALSE);
00176 
00177    status = end_of_run(rn, error);
00178 
00179    if (status == CM_SUCCESS) {
00180       /* don't send events if already stopped */
00181       if (run_state != STATE_STOPPED)
00182          send_all_periodic_events(TR_STOP);
00183 
00184       run_state = STATE_STOPPED;
00185       run_number = rn;
00186 
00187       if (display_period)
00188          ss_printf(14, 2, "Stopped ");
00189    } else
00190       interrupt_enable(TRUE);
00191 
00192    /* flush remaining buffered events */
00193    rpc_flush_event();
00194    for (i = 0; equipment[i].name[0]; i++)
00195       if (equipment[i].buffer_handle) {
00196          INT err = bm_flush_cache(equipment[i].buffer_handle, SYNC);
00197          if (err != BM_SUCCESS) {
00198             cm_msg(MERROR, "tr_prestop", "bm_flush_cache(SYNC) error %d", err);
00199             return err;
00200          }
00201       }
00202 
00203    /* update final statistics record in ODB */
00204    for (i = 0; equipment[i].name[0]; i++) {
00205       eq = &equipment[i];
00206       eq->stats.events_sent += eq->events_sent;
00207       eq->bytes_sent = 0;
00208       eq->events_sent = 0;
00209    }
00210 
00211    db_send_changed_records();
00212 
00213    return status;
00214 }
00215 
00216 /*-- pause ---------------------------------------------------------*/
00217 
00218 INT tr_pause(INT rn, char *error)
00219 {
00220    INT status;
00221 
00222    /* disable interrupts */
00223    interrupt_enable(FALSE);
00224 
00225    status = pause_run(rn, error);
00226 
00227    if (status == CM_SUCCESS) {
00228       run_state = STATE_PAUSED;
00229       run_number = rn;
00230 
00231       send_all_periodic_events(TR_PAUSE);
00232 
00233       if (display_period)
00234          ss_printf(14, 2, "Paused  ");
00235    } else
00236       interrupt_enable(TRUE);
00237 
00238    return status;
00239 }
00240 
00241 /*-- resume --------------------------------------------------------*/
00242 
00243 INT tr_resume(INT rn, char *error)
00244 {
00245    INT status;
00246 
00247    status = resume_run(rn, error);
00248 
00249    if (status == CM_SUCCESS) {
00250       run_state = STATE_RUNNING;
00251       run_number = rn;
00252 
00253       send_all_periodic_events(TR_RESUME);
00254 
00255       if (display_period)
00256          ss_printf(14, 2, "Running ");
00257 
00258       /* enable interrupts */
00259       interrupt_enable(TRUE);
00260    }
00261 
00262    return status;
00263 }
00264 
00265 /*------------------------------------------------------------------*/
00266 
00267 INT manual_trigger(INT index, void *prpc_param[])
00268 {
00269    manual_trigger_event_id = CWORD(0);
00270    return SUCCESS;
00271 }
00272 
00273 /*------------------------------------------------------------------*/
00274 
00275 INT register_equipment(void)
00276 {
00277    INT index, count, size, status, i, j, k, n;
00278    char str[256];
00279    EQUIPMENT_INFO *eq_info;
00280    EQUIPMENT_STATS *eq_stats;
00281    DWORD start_time, delta_time;
00282    HNDLE hKey;
00283    BOOL manual_trig_flag = FALSE;
00284    BANK_LIST *bank_list;
00285    DWORD dummy;
00286 
00287    /* get current ODB run state */
00288    size = sizeof(run_state);
00289    run_state = STATE_STOPPED;
00290    db_get_value(hDB, 0, "/Runinfo/State", &run_state, &size, TID_INT, TRUE);
00291    size = sizeof(run_number);
00292    run_number = 1;
00293    status =
00294        db_get_value(hDB, 0, "/Runinfo/Run number", &run_number, &size, TID_INT, TRUE);
00295    assert(status == SUCCESS);
00296 
00297    /* scan EQUIPMENT table from FRONTEND.C */
00298    for (index = 0; equipment[index].name[0]; index++) {
00299       eq_info = &equipment[index].info;
00300       eq_stats = &equipment[index].stats;
00301 
00302       if (eq_info->event_id == 0) {
00303          printf("\nEvent ID 0 for %s not allowed\n", equipment[index].name);
00304          cm_disconnect_experiment();
00305          ss_sleep(5000);
00306          exit(0);
00307       }
00308 
00309       /* init status */
00310       equipment[index].status = FE_SUCCESS;
00311 
00312       /* check for event builder event */
00313       if (eq_info->eq_type & EQ_EB) {
00314 
00315          if (frontend_index == -1) {
00316             printf("\nEquipment \"%s\" has EQ_EB set, but no", equipment[index].name);
00317             printf(" index specified via \"-i\" flag.\nExiting.");
00318             cm_disconnect_experiment();
00319             ss_sleep(5000);
00320             exit(0);
00321          }
00322 
00323          /* modify equipment name to <name>xx where xx is the frontend index */
00324          sprintf(equipment[index].name + strlen(equipment[index].name), "%02d",
00325                  frontend_index);
00326 
00327          /* modify event buffer name to <name>xx where xx is the frontend index */
00328          sprintf(eq_info->buffer + strlen(eq_info->buffer), "%02d", frontend_index);
00329       }
00330 
00331       sprintf(str, "/Equipment/%s/Common", equipment[index].name);
00332 
00333       /* get last event limit from ODB */
00334       if (eq_info->eq_type != EQ_SLOW) {
00335          db_find_key(hDB, 0, str, &hKey);
00336          size = sizeof(double);
00337          if (hKey)
00338             db_get_value(hDB, hKey, "Event limit", &eq_info->event_limit, &size,
00339                          TID_DOUBLE, TRUE);
00340       }
00341 
00342       /* Create common subtree */
00343       status = db_check_record(hDB, 0, str, EQUIPMENT_COMMON_STR, FALSE);
00344       if (status == DB_NO_KEY || status == DB_STRUCT_MISMATCH) {
00345          db_create_record(hDB, 0, str, EQUIPMENT_COMMON_STR);
00346          db_find_key(hDB, 0, str, &hKey);
00347          db_set_record(hDB, hKey, eq_info, sizeof(EQUIPMENT_INFO), 0);
00348       } else if (status != DB_SUCCESS) {
00349          printf("Cannot check equipment record, status = %d\n", status);
00350          ss_sleep(3000);
00351       }
00352       db_find_key(hDB, 0, str, &hKey);
00353       assert(hKey);
00354 
00355       /* open hot link to equipment info */
00356       db_open_record(hDB, hKey, eq_info, sizeof(EQUIPMENT_INFO), MODE_READ, NULL, NULL);
00357 
00358       if (equal_ustring(eq_info->format, "YBOS"))
00359          equipment[index].format = FORMAT_YBOS;
00360       else if (equal_ustring(eq_info->format, "FIXED"))
00361          equipment[index].format = FORMAT_FIXED;
00362       else                      /* default format is MIDAS */
00363          equipment[index].format = FORMAT_MIDAS;
00364 
00365       gethostname(eq_info->frontend_host, sizeof(eq_info->frontend_host));
00366       strcpy(eq_info->frontend_name, full_frontend_name);
00367       strcpy(eq_info->frontend_file_name, frontend_file_name);
00368 
00369       /* update variables in ODB */
00370       db_set_record(hDB, hKey, eq_info, sizeof(EQUIPMENT_INFO), 0);
00371 
00372       /*---- Create variables record ---------------------------------*/
00373 
00374       sprintf(str, "/Equipment/%s/Variables", equipment[index].name);
00375       if (equipment[index].event_descrip) {
00376          if (equipment[index].format == FORMAT_FIXED)
00377             db_check_record(hDB, 0, str, (char *) equipment[index].event_descrip, TRUE);
00378          else {
00379             /* create bank descriptions */
00380             bank_list = (BANK_LIST *) equipment[index].event_descrip;
00381 
00382             for (; bank_list->name[0]; bank_list++) {
00383                /* mabye needed later...
00384                   if (bank_list->output_flag == 0)
00385                   continue;
00386                 */
00387 
00388                if (bank_list->type == TID_STRUCT) {
00389                   sprintf(str, "/Equipment/%s/Variables/%s", equipment[index].name,
00390                           bank_list->name);
00391                   status =
00392                       db_check_record(hDB, 0, str, strcomb(bank_list->init_str), TRUE);
00393                   if (status != DB_SUCCESS) {
00394                      printf("Cannot check/create record \"%s\", status = %d\n", str,
00395                             status);
00396                      ss_sleep(3000);
00397                   }
00398                } else {
00399                   sprintf(str, "/Equipment/%s/Variables/%s", equipment[index].name,
00400                           bank_list->name);
00401                   dummy = 0;
00402                   db_set_value(hDB, 0, str, &dummy, rpc_tid_size(bank_list->type), 1,
00403                                bank_list->type);
00404                }
00405             }
00406          }
00407       } else
00408          db_create_key(hDB, 0, str, TID_KEY);
00409 
00410       sprintf(str, "/Equipment/%s/Variables", equipment[index].name);
00411       db_find_key(hDB, 0, str, &hKey);
00412       equipment[index].hkey_variables = hKey;
00413 
00414       /*---- Create and initialize statistics tree -------------------*/
00415 
00416       sprintf(str, "/Equipment/%s/Statistics", equipment[index].name);
00417 
00418       status = db_check_record(hDB, 0, str, EQUIPMENT_STATISTICS_STR, TRUE);
00419       if (status != DB_SUCCESS) {
00420          printf("Cannot create/check statistics record \'%s\', error %d\n", str, status);
00421          ss_sleep(3000);
00422       }
00423 
00424       status = db_find_key(hDB, 0, str, &hKey);
00425       if (status != DB_SUCCESS) {
00426          printf("Cannot find statistics record \'%s\', error %d\n", str, status);
00427          ss_sleep(3000);
00428       }
00429 
00430       eq_stats->events_sent = 0;
00431       eq_stats->events_per_sec = 0;
00432       eq_stats->kbytes_per_sec = 0;
00433 
00434       /* open hot link to statistics tree */
00435       status =
00436           db_open_record(hDB, hKey, eq_stats, sizeof(EQUIPMENT_STATS), MODE_WRITE, NULL,
00437                          NULL);
00438       if (status != DB_SUCCESS) {
00439          cm_msg(MERROR, "register_equipment",
00440                 "Cannot open statistics record \'%s\', error %d. Probably other FE is using it",
00441                 str, status);
00442          ss_sleep(3000);
00443       }
00444 
00445       /*---- open event buffer ---------------------------------------*/
00446 
00447       if (eq_info->buffer[0]) {
00448          status =
00449              bm_open_buffer(eq_info->buffer, EVENT_BUFFER_SIZE,
00450                             &equipment[index].buffer_handle);
00451          if (status != BM_SUCCESS && status != BM_CREATED) {
00452             cm_msg(MERROR, "register_equipment",
00453                    "Cannot open event buffer. Try to reduce EVENT_BUFFER_SIZE in midas.h \
00454 and rebuild the system.");
00455             return 0;
00456          }
00457 
00458          /* set the default buffer cache size */
00459          bm_set_cache_size(equipment[index].buffer_handle, 0, SERVER_CACHE_SIZE);
00460       } else
00461          equipment[index].buffer_handle = 0;
00462 
00463       /*---- evaluate polling count ----------------------------------*/
00464 
00465       if (eq_info->eq_type & EQ_POLLED) {
00466          if (display_period)
00467             printf("\nCalibrating");
00468 
00469          count = 1;
00470          do {
00471             if (display_period)
00472                printf(".");
00473 
00474             start_time = ss_millitime();
00475 
00476             poll_event(equipment[index].info.source, count, TRUE);
00477 
00478             delta_time = ss_millitime() - start_time;
00479 
00480             if (delta_time > 0)
00481                count = (INT) ((double) count * 100 / delta_time);
00482             else
00483                count *= 100;
00484          } while (delta_time > 120 || delta_time < 80);
00485 
00486          equipment[index].poll_count = (INT) ((double) eq_info->period / 100 * count);
00487 
00488          if (display_period)
00489             printf("OK\n");
00490       }
00491 
00492       /*---- initialize interrupt events -----------------------------*/
00493 
00494       if (eq_info->eq_type & EQ_INTERRUPT) {
00495          /* install interrupt for interrupt events */
00496 
00497          for (i = 0; equipment[i].name[0]; i++)
00498             if (equipment[i].info.eq_type & EQ_POLLED) {
00499                equipment[index].status = FE_ERR_DISABLED;
00500                cm_msg(MINFO, "register_equipment",
00501                       "Interrupt readout cannot be combined with polled readout");
00502             }
00503 
00504          if (equipment[index].status != FE_ERR_DISABLED) {
00505             if (eq_info->enabled) {
00506                if (interrupt_eq) {
00507                   equipment[index].status = FE_ERR_DISABLED;
00508                   cm_msg(MINFO, "register_equipment",
00509                          "Defined more than one equipment with interrupt readout");
00510                } else {
00511                   interrupt_configure(CMD_INTERRUPT_ATTACH, eq_info->source,
00512                                       (POINTER_T) interrupt_routine);
00513                   interrupt_eq = &equipment[index];
00514                   interrupt_odb_buffer = malloc(MAX_EVENT_SIZE + sizeof(EVENT_HEADER));
00515                }
00516             } else {
00517                equipment[index].status = FE_ERR_DISABLED;
00518                cm_msg(MINFO, "register_equipment",
00519                       "Equipment %s disabled in file \"frontend.c\"",
00520                       equipment[index].name);
00521             }
00522          }
00523       }
00524 
00525       /*---- initialize slow control equipment -----------------------*/
00526 
00527       if (eq_info->eq_type & EQ_SLOW) {
00528          /* resolve duplicate device names */
00529          for (i = 0; equipment[index].driver[i].name[0]; i++)
00530             for (j = i + 1; equipment[index].driver[j].name[0]; j++)
00531                if (equal_ustring(equipment[index].driver[i].name,
00532                                  equipment[index].driver[j].name)) {
00533                   strcpy(str, equipment[index].driver[i].name);
00534                   for (k = 0, n = 0; equipment[index].driver[k].name[0]; k++)
00535                      if (equal_ustring(str, equipment[index].driver[k].name))
00536                         sprintf(equipment[index].driver[k].name, "%s_%d", str, n++);
00537 
00538                   break;
00539                }
00540 
00541          /* loop over equipment list and call class driver's init method */
00542          if (eq_info->enabled)
00543             equipment[index].status = equipment[index].cd(CMD_INIT, &equipment[index]);
00544          else {
00545             equipment[index].status = FE_ERR_DISABLED;
00546             cm_msg(MINFO, "register_equipment",
00547                    "Equipment %s disabled in file \"frontend.c\"", equipment[index].name);
00548          }
00549 
00550          /* remember that we have slowcontrol equipment (needed later for scheduler) */
00551          slowcont_eq = TRUE;
00552 
00553          /* let user read error messages */
00554          if (equipment[index].status != FE_SUCCESS)
00555             ss_sleep(3000);
00556       }
00557 
00558       /*---- register callback for manual triggered events -----------*/
00559       if (eq_info->eq_type & EQ_MANUAL_TRIG) {
00560          if (!manual_trig_flag)
00561             cm_register_function(RPC_MANUAL_TRIG, manual_trigger);
00562 
00563          manual_trig_flag = TRUE;
00564       }
00565    }
00566 
00567    return SUCCESS;
00568 }
00569 
00570 /*------------------------------------------------------------------*/
00571 
00572 void update_odb(EVENT_HEADER * pevent, HNDLE hKey, INT format)
00573 {
00574    INT size, i, ni4, tsize, status, n_data;
00575    char *pdata;
00576    char name[5];
00577    BANK_HEADER *pbh;
00578    BANK *pbk;
00579    BANK32 *pbk32;
00580    char *pydata;
00581    DWORD odb_type;
00582    DWORD *pyevt, bkname;
00583    WORD bktype;
00584    HNDLE hKeyRoot, hKeyl;
00585    KEY key;
00586 
00587    /* outcommented sind db_find_key does not work in FTCP mode, SR 25.4.03
00588       rpc_set_option(-1, RPC_OTRANSPORT, RPC_FTCP); */
00589 
00590    if (format == FORMAT_FIXED) {
00591       if (db_set_record(hDB, hKey, (char *) (pevent + 1),
00592                         pevent->data_size, 0) != DB_SUCCESS)
00593          cm_msg(MERROR, "update_odb", "event #%d size mismatch", pevent->event_id);
00594    } else if (format == FORMAT_MIDAS) {
00595       pbh = (BANK_HEADER *) (pevent + 1);
00596       pbk = NULL;
00597       pbk32 = NULL;
00598       do {
00599          /* scan all banks */
00600          if (bk_is32(pbh)) {
00601             size = bk_iterate32(pbh, &pbk32, &pdata);
00602             if (pbk32 == NULL)
00603                break;
00604             bkname = *((DWORD *) pbk32->name);
00605             bktype = (WORD) pbk32->type;
00606          } else {
00607             size = bk_iterate(pbh, &pbk, &pdata);
00608             if (pbk == NULL)
00609                break;
00610             bkname = *((DWORD *) pbk->name);
00611             bktype = (WORD) pbk->type;
00612          }
00613 
00614          n_data = size;
00615          if (rpc_tid_size(bktype & 0xFF))
00616             n_data /= rpc_tid_size(bktype & 0xFF);
00617 
00618          /* get bank key */
00619          *((DWORD *) name) = bkname;
00620          name[4] = 0;
00621 
00622          if (bktype == TID_STRUCT) {
00623             status = db_find_key(hDB, hKey, name, &hKeyRoot);
00624             if (status != DB_SUCCESS) {
00625                cm_msg(MERROR, "update_odb",
00626                       "please define bank %s in BANK_LIST in frontend.c", name);
00627                continue;
00628             }
00629 
00630             /* write structured bank */
00631             for (i = 0;; i++) {
00632                status = db_enum_key(hDB, hKeyRoot, i, &hKeyl);
00633                if (status == DB_NO_MORE_SUBKEYS)
00634                   break;
00635 
00636                db_get_key(hDB, hKeyl, &key);
00637 
00638                /* adjust for alignment */
00639                if (key.type != TID_STRING && key.type != TID_LINK)
00640                   pdata =
00641                       (void *) VALIGN(pdata, MIN(ss_get_struct_align(), key.item_size));
00642 
00643                status = db_set_data(hDB, hKeyl, pdata, key.item_size * key.num_values,
00644                                     key.num_values, key.type);
00645                if (status != DB_SUCCESS) {
00646                   cm_msg(MERROR, "update_odb", "cannot write %s to ODB", name);
00647                   continue;
00648                }
00649 
00650                /* shift data pointer to next item */
00651                pdata += key.item_size * key.num_values;
00652             }
00653          } else {
00654             /* write variable length bank  */
00655             if (n_data > 0)
00656                db_set_value(hDB, hKey, name, pdata, size, n_data, bktype & 0xFF);
00657          }
00658 
00659       } while (1);
00660    } else if (format == FORMAT_YBOS) {
00661 #ifdef YBOS_SUPPORT
00662       YBOS_BANK_HEADER *pybkh;
00663 
00664       /* skip the lrl (4 bytes per event) */
00665       pyevt = (DWORD *) (pevent + 1);
00666       pybkh = NULL;
00667       do {
00668          /* scan all banks */
00669          ni4 = ybk_iterate(pyevt, &pybkh, (void *) (&pydata));
00670          if (pybkh == NULL || ni4 == 0)
00671             break;
00672 
00673          /* find the corresponding odb type */
00674          tsize = odb_type = 0;
00675          for (i = 0; id_map[0].ybos_type > 0; i++) {
00676             if (pybkh->type == id_map[i].ybos_type) {
00677                odb_type = id_map[i].odb_type;
00678                tsize = id_map[i].tsize;
00679                break;
00680             }
00681          }
00682 
00683          /* extract bank name (key name) */
00684          *((DWORD *) name) = pybkh->name;
00685          name[4] = 0;
00686 
00687          /* reject EVID bank */
00688          if (strncmp(name, "EVID", 4) == 0)
00689             continue;
00690 
00691          /* correct YBS number of entries */
00692          if (pybkh->type == D8_BKTYPE)
00693             ni4 /= 2;
00694          if (pybkh->type == I2_BKTYPE)
00695             ni4 *= 2;
00696          if (pybkh->type == I1_BKTYPE || pybkh->type == A1_BKTYPE)
00697             ni4 *= 4;
00698 
00699          /* write bank to ODB, ni4 always in I*4 */
00700          size = ni4 * tsize;
00701          if ((status =
00702               db_set_value(hDB, hKey, name, pydata, size, ni4,
00703                            odb_type & 0xFF)) != DB_SUCCESS) {
00704             printf("status:%d odb_type:%d name:%s ni4:%d size:%d tsize:%d\n", status,
00705                    odb_type, name, ni4, size, tsize);
00706             for (i = 0; i < 6; i++) {
00707                printf("data: %f\n", *((float *) (pydata)));
00708                pydata += sizeof(float);
00709             }
00710          }
00711       } while (1);
00712 #endif                          /* YBOS_SUPPORT */
00713    }
00714 
00715    rpc_set_option(-1, RPC_OTRANSPORT, RPC_TCP);
00716 }
00717 
00718 /*------------------------------------------------------------------*/
00719 
00720 int send_event(INT index)
00721 {
00722    EQUIPMENT_INFO *eq_info;
00723    EVENT_HEADER *pevent, *pfragment;
00724    char *pdata;
00725    unsigned char *pd;
00726    INT i, status;
00727    DWORD sent, size;
00728    static void *frag_buffer = NULL;
00729 
00730    eq_info = &equipment[index].info;
00731 
00732    /* check for fragmented event */
00733    if (eq_info->eq_type & EQ_FRAGMENTED) {
00734       if (frag_buffer == NULL)
00735          frag_buffer = malloc(max_event_size_frag);
00736 
00737       if (frag_buffer == NULL) {
00738          cm_msg(MERROR, "send_event",
00739                 "Not enough memory to allocate buffer for fragmented events");
00740          return SS_NO_MEMORY;
00741       }
00742 
00743       pevent = frag_buffer;
00744    } else {
00745       /* return value should be valid pointer. if NULL BIG error ==> abort  */
00746       pevent = dm_pointer_get();
00747       if (pevent == NULL) {
00748          cm_msg(MERROR, "send_event", "dm_pointer_get not returning valid pointer");
00749          return SS_NO_MEMORY;
00750       }
00751    }
00752 
00753    /* compose MIDAS event header */
00754    pevent->event_id = eq_info->event_id;
00755    pevent->trigger_mask = eq_info->trigger_mask;
00756    pevent->data_size = 0;
00757    pevent->time_stamp = ss_time();
00758    pevent->serial_number = equipment[index].serial_number++;
00759 
00760    equipment[index].last_called = ss_millitime();
00761 
00762    /* call user readout routine */
00763    *((EQUIPMENT **) (pevent + 1)) = &equipment[index];
00764    pevent->data_size = equipment[index].readout((char *) (pevent + 1), 0);
00765 
00766    /* send event */
00767    if (pevent->data_size) {
00768       if (eq_info->eq_type & EQ_FRAGMENTED) {
00769          /* fragment event */
00770          if (pevent->data_size + sizeof(EVENT_HEADER) > (DWORD) max_event_size_frag) {
00771             cm_msg(MERROR, "send_event",
00772                    "Event size %ld larger than maximum size %d for frag. ev.",
00773                    (long) (pevent->data_size + sizeof(EVENT_HEADER)),
00774                    max_event_size_frag);
00775             return SS_NO_MEMORY;
00776          }
00777 
00778          /* compose fragments */
00779          pfragment = dm_pointer_get();
00780          if (pfragment == NULL) {
00781             cm_msg(MERROR, "send_event", "dm_pointer_get not returning valid pointer");
00782             return SS_NO_MEMORY;
00783          }
00784 
00785          /* compose MIDAS event header */
00786          memcpy(pfragment, pevent, sizeof(EVENT_HEADER));
00787          pfragment->event_id |= EVENTID_FRAG1;
00788 
00789          /* store total event size */
00790          pd = (unsigned char *) (pfragment + 1);
00791          size = pevent->data_size;
00792          for (i = 0; i < 4; i++) {
00793             pd[i] = (unsigned char) (size & 0xFF);      /* little endian, please! */
00794             size >>= 8;
00795          }
00796 
00797          pfragment->data_size = sizeof(DWORD);
00798 
00799          pdata = (char *) (pevent + 1);
00800 
00801          for (i = 0, sent = 0; sent < pevent->data_size; i++) {
00802             if (i > 0) {
00803                pfragment = dm_pointer_get();
00804 
00805                /* compose MIDAS event header */
00806                memcpy(pfragment, pevent, sizeof(EVENT_HEADER));
00807                pfragment->event_id |= EVENTID_FRAG;
00808 
00809                /* copy portion of event */
00810                size = pevent->data_size - sent;
00811                if (size > max_event_size - sizeof(EVENT_HEADER))
00812                   size = max_event_size - sizeof(EVENT_HEADER);
00813 
00814                memcpy(pfragment + 1, pdata, size);
00815                pfragment->data_size = size;
00816                sent += size;
00817                pdata += size;
00818             }
00819 
00820             /* send event to buffer */
00821             if (equipment[index].buffer_handle) {
00822 #ifdef USE_EVENT_CHANNEL
00823                dm_pointer_increment(equipment[index].buffer_handle,
00824                                     pfragment->data_size + sizeof(EVENT_HEADER));
00825 #else
00826                rpc_flush_event();
00827                status = bm_send_event(equipment[index].buffer_handle, pfragment,
00828                                       pfragment->data_size + sizeof(EVENT_HEADER), SYNC);
00829                if (status != BM_SUCCESS) {
00830                   cm_msg(MERROR, "send_event", "bm_send_event(SYNC) error %d", status);
00831                   return status;
00832                }
00833 #endif
00834             }
00835          }
00836 
00837          if (equipment[index].buffer_handle) {
00838 #ifndef USE_EVENT_CHANNEL
00839             status = bm_flush_cache(equipment[index].buffer_handle, SYNC);
00840             if (status != BM_SUCCESS) {
00841                cm_msg(MERROR, "send_event", "bm_flush_cache(SYNC) error %d", status);
00842                return status;
00843             }
00844 #endif
00845          }
00846       } else {
00847          /* send unfragmented event */
00848 
00849          if (pevent->data_size + sizeof(EVENT_HEADER) > (DWORD) max_event_size) {
00850             cm_msg(MERROR, "send_event", "Event size %ld larger than maximum size %d",
00851                    (long) (pevent->data_size + sizeof(EVENT_HEADER)), max_event_size);
00852             return SS_NO_MEMORY;
00853          }
00854 
00855          /* send event to buffer */
00856          if (equipment[index].buffer_handle) {
00857 #ifdef USE_EVENT_CHANNEL
00858             dm_pointer_increment(equipment[index].buffer_handle,
00859                                  pevent->data_size + sizeof(EVENT_HEADER));
00860 #else
00861             rpc_flush_event();
00862             status = bm_send_event(equipment[index].buffer_handle, pevent,
00863                                    pevent->data_size + sizeof(EVENT_HEADER), SYNC);
00864             if (status != BM_SUCCESS) {
00865                cm_msg(MERROR, "send_event", "bm_send_event(SYNC) error %d", status);
00866                return status;
00867             }
00868             status = bm_flush_cache(equipment[index].buffer_handle, SYNC);
00869             if (status != BM_SUCCESS) {
00870                cm_msg(MERROR, "send_event", "bm_flush_cache(SYNC) error %d", status);
00871                return status;
00872             }
00873 #endif
00874          }
00875 
00876          /* send event to ODB if RO_ODB flag is set or history is on. Do not
00877             send SLOW events since the class driver does that */
00878          if ((eq_info->read_on & RO_ODB) ||
00879              (eq_info->history > 0 && (eq_info->eq_type & ~EQ_SLOW))) {
00880             update_odb(pevent, equipment[index].hkey_variables, equipment[index].format);
00881             equipment[index].odb_out++;
00882          }
00883       }
00884 
00885       equipment[index].bytes_sent += pevent->data_size + sizeof(EVENT_HEADER);
00886       equipment[index].events_sent++;
00887 
00888       equipment[index].stats.events_sent += equipment[index].events_sent;
00889       equipment[index].events_sent = 0;
00890    } else
00891       equipment[index].serial_number--;
00892 
00893    /* emtpy event buffer */
00894 #ifdef USE_EVENT_CHANNEL
00895    if ((status = dm_area_flush()) != CM_SUCCESS)
00896       cm_msg(MERROR, "send_event", "dm_area_flush: %i", status);
00897 #endif
00898 
00899    for (i = 0; equipment[i].name[0]; i++)
00900       if (equipment[i].buffer_handle) {
00901          status = bm_flush_cache(equipment[i].buffer_handle, SYNC);
00902          if (status != BM_SUCCESS) {
00903             cm_msg(MERROR, "send_event", "bm_flush_cache(SYNC) error %d", status);
00904             return status;
00905          }
00906       }
00907 
00908    return CM_SUCCESS;
00909 }
00910 
00911 /*------------------------------------------------------------------*/
00912 
00913 void send_all_periodic_events(INT transition)
00914 {
00915    EQUIPMENT_INFO *eq_info;
00916    INT i;
00917 
00918    for (i = 0; equipment[i].name[0]; i++) {
00919       eq_info = &equipment[i].info;
00920 
00921       if (!eq_info->enabled || equipment[i].status != FE_SUCCESS)
00922          continue;
00923 
00924       if (transition == TR_START && (eq_info->read_on & RO_BOR) == 0)
00925          continue;
00926       if (transition == TR_STOP && (eq_info->read_on & RO_EOR) == 0)
00927          continue;
00928       if (transition == TR_PAUSE && (eq_info->read_on & RO_PAUSE) == 0)
00929          continue;
00930       if (transition == TR_RESUME && (eq_info->read_on & RO_RESUME) == 0)
00931          continue;
00932 
00933       send_event(i);
00934    }
00935 }
00936 
00937 /*------------------------------------------------------------------*/
00938 
00939 BOOL interrupt_enabled;
00940 
00941 void interrupt_enable(BOOL flag)
00942 {
00943    interrupt_enabled = flag;
00944 
00945    if (interrupt_eq) {
00946       if (interrupt_enabled)
00947          interrupt_configure(CMD_INTERRUPT_ENABLE, 0, 0);
00948       else
00949          interrupt_configure(CMD_INTERRUPT_DISABLE, 0, 0);
00950    }
00951 }
00952 
00953 /*------------------------------------------------------------------*/
00954 
00955 void interrupt_routine(void)
00956 {
00957    EVENT_HEADER *pevent;
00958 
00959    /* get pointer for upcoming event.
00960       This is a blocking call if no space available */
00961    if ((pevent = dm_pointer_get()) == NULL)
00962       cm_msg(MERROR, "interrupt_routine", "interrupt, dm_pointer_get returned NULL");
00963 
00964    /* compose MIDAS event header */
00965    pevent->event_id = interrupt_eq->info.event_id;
00966    pevent->trigger_mask = interrupt_eq->info.trigger_mask;
00967    pevent->data_size = 0;
00968    pevent->time_stamp = actual_time;
00969    pevent->serial_number = interrupt_eq->serial_number++;
00970 
00971    /* call user readout routine */
00972    pevent->data_size = interrupt_eq->readout((char *) (pevent + 1), 0);
00973 
00974    /* send event */
00975    if (pevent->data_size) {
00976       interrupt_eq->bytes_sent += pevent->data_size + sizeof(EVENT_HEADER);
00977       interrupt_eq->events_sent++;
00978 
00979       if (interrupt_eq->buffer_handle) {
00980 #ifdef USE_EVENT_CHANNEL
00981          dm_pointer_increment(interrupt_eq->buffer_handle,
00982                               pevent->data_size + sizeof(EVENT_HEADER));
00983 #else
00984          rpc_send_event(interrupt_eq->buffer_handle, pevent,
00985                         pevent->data_size + sizeof(EVENT_HEADER), SYNC);
00986 #endif
00987       }
00988 
00989       /* send event to ODB */
00990       if (interrupt_eq->info.read_on & RO_ODB || interrupt_eq->info.history) {
00991          if (actual_millitime - interrupt_eq->last_called > ODB_UPDATE_TIME) {
00992             interrupt_eq->last_called = actual_millitime;
00993             memcpy(interrupt_odb_buffer, pevent,
00994                    pevent->data_size + sizeof(EVENT_HEADER));
00995             interrupt_odb_buffer_valid = TRUE;
00996             interrupt_eq->odb_out++;
00997          }
00998       }
00999    } else
01000       interrupt_eq->serial_number--;
01001 
01002 }
01003 
01004 /*------------------------------------------------------------------*/
01005 
01006 int message_print(const char *msg)
01007 {
01008    char str[160];
01009 
01010    memset(str, ' ', 159);
01011    str[159] = 0;
01012 
01013    if (msg[0] == '[')
01014       msg = strchr(msg, ']') + 2;
01015 
01016    memcpy(str, msg, strlen(msg));
01017    ss_printf(0, 20, str);
01018 
01019    return 0;
01020 }
01021 
01022 /*------------------------------------------------------------------*/
01023 
01024 void display(BOOL bInit)
01025 {
01026    INT i, status;
01027    time_t full_time;
01028    char str[30];
01029 
01030    if (bInit) {
01031       ss_clear_screen();
01032 
01033       if (host_name[0])
01034          strcpy(str, host_name);
01035       else
01036          strcpy(str, "<local>");
01037 
01038       ss_printf(0, 0, "%s connected to %s. Press \"!\" to exit", full_frontend_name, str);
01039       ss_printf(0, 1,
01040                 "================================================================================");
01041       ss_printf(0, 2, "Run status:   %s",
01042                 run_state == STATE_STOPPED ? "Stopped" : run_state ==
01043                 STATE_RUNNING ? "Running" : "Paused");
01044       ss_printf(25, 2, "Run number %d   ", run_number);
01045       ss_printf(0, 3,
01046                 "================================================================================");
01047       ss_printf(0, 4,
01048                 "Equipment     Status     Events     Events/sec Rate[kB/s] ODB->FE    FE->ODB");
01049       ss_printf(0, 5,
01050                 "--------------------------------------------------------------------------------");
01051       for (i = 0; equipment[i].name[0]; i++)
01052          ss_printf(0, i + 6, "%s", equipment[i].name);
01053    }
01054 
01055    /* display time */
01056    time(&full_time);
01057    strcpy(str, ctime(&full_time) + 11);
01058    str[8] = 0;
01059    ss_printf(72, 0, "%s", str);
01060 
01061    for (i = 0; equipment[i].name[0]; i++) {
01062       status = equipment[i].status;
01063 
01064       if ((status == 0 || status == FE_SUCCESS) && equipment[i].info.enabled)
01065          ss_printf(14, i + 6, "OK       ");
01066       else if (!equipment[i].info.enabled)
01067          ss_printf(14, i + 6, "Disabled ");
01068       else if (status == FE_ERR_ODB)
01069          ss_printf(14, i + 6, "ODB Error");
01070       else if (status == FE_ERR_HW)
01071          ss_printf(14, i + 6, "HW Error ");
01072       else if (status == FE_ERR_DISABLED)
01073          ss_printf(14, i + 6, "Disabled ");
01074       else
01075          ss_printf(14, i + 6, "Unknown  ");
01076 
01077       if (equipment[i].stats.events_sent > 1E9)
01078          ss_printf(25, i + 6, "%1.3lfG     ", equipment[i].stats.events_sent / 1E9);
01079       else if (equipment[i].stats.events_sent > 1E6)
01080          ss_printf(25, i + 6, "%1.3lfM     ", equipment[i].stats.events_sent / 1E6);
01081       else
01082          ss_printf(25, i + 6, "%1.0lf      ", equipment[i].stats.events_sent);
01083       ss_printf(36, i + 6, "%1.1lf      ", equipment[i].stats.events_per_sec);
01084       ss_printf(47, i + 6, "%1.1lf      ", equipment[i].stats.kbytes_per_sec);
01085       ss_printf(58, i + 6, "%ld       ", equipment[i].odb_in);
01086       ss_printf(69, i + 6, "%ld       ", equipment[i].odb_out);
01087    }
01088 
01089    /* go to next line */
01090    ss_printf(0, i + 6, "");
01091 }
01092 
01093 /*------------------------------------------------------------------*/
01094 
01095 BOOL logger_root()
01096 /* check if logger uses ROOT format */
01097 {
01098    int size, i, status;
01099    char str[80];
01100    HNDLE hKeyRoot, hKey;
01101 
01102    if (db_find_key(hDB, 0, "/Logger/Channels", &hKeyRoot) == DB_SUCCESS) {
01103       for (i = 0;; i++) {
01104          status = db_enum_key(hDB, hKeyRoot, i, &hKey);
01105          if (status == DB_NO_MORE_SUBKEYS)
01106             break;
01107 
01108          strcpy(str, "MIDAS");
01109          size = sizeof(str);
01110          db_get_value(hDB, hKey, "Settings/Format", str, &size, TID_STRING, TRUE);
01111 
01112          if (equal_ustring(str, "ROOT"))
01113             return TRUE;
01114       }
01115    }
01116 
01117    return FALSE;
01118 }
01119 
01120 /*------------------------------------------------------------------*/
01121 
01122 INT scheduler(void)
01123 {
01124    EQUIPMENT_INFO *eq_info;
01125    EQUIPMENT *eq;
01126    EVENT_HEADER *pevent;
01127    DWORD last_time_network = 0, last_time_display = 0, last_time_flush = 0, readout_start;
01128    INT i, j, index, status, ch, source, size, state;
01129    char str[80];
01130    BOOL buffer_done, flag, force_update = FALSE;
01131 
01132    INT opt_max = 0, opt_index = 0, opt_tcp_size = 128, opt_cnt = 0;
01133    INT err;
01134 
01135 #ifdef OS_VXWORKS
01136    rpc_set_opt_tcp_size(1024);
01137 #ifdef PPCxxx
01138    rpc_set_opt_tcp_size(NET_TCP_SIZE);
01139 #endif
01140 #endif
01141 
01142    /*----------------- MAIN equipment loop ------------------------------*/
01143 
01144    do {
01145       actual_millitime = ss_millitime();
01146       actual_time = ss_time();
01147 
01148       /*---- loop over equipment table -------------------------------*/
01149       for (index = 0;; index++) {
01150          eq = &equipment[index];
01151          eq_info = &eq->info;
01152 
01153          /* check if end of equipment list */
01154          if (!eq->name[0])
01155             break;
01156 
01157          if (!eq_info->enabled)
01158             continue;
01159 
01160          if (eq->status != FE_SUCCESS)
01161             continue;
01162 
01163          /*---- call idle routine for slow control equipment ----*/
01164          if ((eq_info->eq_type & EQ_SLOW) && eq->status == FE_SUCCESS) {
01165             if (eq_info->event_limit > 0) {
01166                if (actual_millitime - eq->last_idle >= (DWORD) eq_info->event_limit) {
01167                   eq->cd(CMD_IDLE, eq);
01168                   eq->last_idle = actual_millitime;
01169                }
01170             } else
01171                eq->cd(CMD_IDLE, eq);
01172          }
01173 
01174          if (run_state == STATE_STOPPED && (eq_info->read_on & RO_STOPPED) == 0)
01175             continue;
01176          if (run_state == STATE_PAUSED && (eq_info->read_on & RO_PAUSED) == 0)
01177             continue;
01178          if (run_state == STATE_RUNNING && (eq_info->read_on & RO_RUNNING) == 0)
01179             continue;
01180 
01181          /*---- check periodic events ----*/
01182          if ((eq_info->eq_type & EQ_PERIODIC) || (eq_info->eq_type & EQ_SLOW)) {
01183             if (eq_info->period == 0)
01184                continue;
01185 
01186             /* check if period over */
01187             if (actual_millitime - eq->last_called >= (DWORD) eq_info->period) {
01188                /* disable interrupts during readout */
01189                interrupt_enable(FALSE);
01190 
01191                /* readout and send event */
01192                status = send_event(index);
01193 
01194                if (status != CM_SUCCESS) {
01195                   cm_msg(MERROR, "scheduler", "send_event error %d", status);
01196                   goto net_error;
01197                }
01198 
01199                /* re-enable the interrupt after periodic */
01200                interrupt_enable(TRUE);
01201             }
01202          }
01203 
01204          /* end of periodic equipments */
01205          /*---- check polled events ----*/
01206          if (eq_info->eq_type & EQ_POLLED) {
01207             readout_start = actual_millitime;
01208             pevent = NULL;
01209 
01210             while ((source = poll_event(eq_info->source, eq->poll_count, FALSE)) > 0) {
01211                pevent = dm_pointer_get();
01212                if (pevent == NULL) {
01213                   cm_msg(MERROR, "scheduler",
01214                          "polled, dm_pointer_get not returning valid pointer");
01215                   status = SS_NO_MEMORY;
01216                   goto net_error;
01217                }
01218 
01219                /* compose MIDAS event header */
01220                pevent->event_id = eq_info->event_id;
01221                pevent->trigger_mask = eq_info->trigger_mask;
01222                pevent->data_size = 0;
01223                pevent->time_stamp = actual_time;
01224                pevent->serial_number = eq->serial_number;
01225 
01226                /* put source at beginning of event, will be overwritten by 
01227                   user readout code, just a special feature used by some 
01228                   multi-source applications */
01229                *(INT *) (pevent + 1) = source;
01230 
01231                if (eq->info.num_subevents) {
01232                   eq->subevent_number = 0;
01233                   do {
01234                      *(INT *) ((char *) (pevent + 1) + pevent->data_size) = source;
01235 
01236                      /* call user readout routine for subevent indicating offset */
01237                      size = eq->readout((char *) (pevent + 1), pevent->data_size);
01238                      pevent->data_size += size;
01239                      if (size > 0) {
01240                         if (pevent->data_size + sizeof(EVENT_HEADER) >
01241                             (DWORD) max_event_size) {
01242                            cm_msg(MERROR, "scheduler",
01243                                   "Event size %ld larger than maximum size %d",
01244                                   (long) (pevent->data_size + sizeof(EVENT_HEADER)),
01245                                   max_event_size);
01246                         }
01247 
01248                         eq->subevent_number++;
01249                         eq->serial_number++;
01250                      }
01251 
01252                      /* wait for next event */
01253                      do {
01254                         source = poll_event(eq_info->source, eq->poll_count, FALSE);
01255 
01256                         if (source == FALSE) {
01257                            actual_millitime = ss_millitime();
01258 
01259                            /* repeat no more than period */
01260                            if (actual_millitime - readout_start > (DWORD) eq_info->period)
01261                               break;
01262                         }
01263                      } while (source == FALSE);
01264 
01265                   } while (eq->subevent_number < eq->info.num_subevents && source);
01266 
01267                   /* notify readout routine about end of super-event */
01268                   pevent->data_size = eq->readout((char *) (pevent + 1), -1);
01269                } else {
01270                   /* call user readout routine indicating event source */
01271                   pevent->data_size = eq->readout((char *) (pevent + 1), 0);
01272 
01273                   /* check event size */
01274                   if (pevent->data_size + sizeof(EVENT_HEADER) > (DWORD) max_event_size) {
01275                      cm_msg(MERROR, "scheduler",
01276                             "Event size %ld larger than maximum size %d",
01277                             (long) (pevent->data_size + sizeof(EVENT_HEADER)),
01278                             max_event_size);
01279                   }
01280 
01281                   /* increment serial number if event read out sucessfully */
01282                   if (pevent->data_size)
01283                      eq->serial_number++;
01284                }
01285 
01286                /* send event */
01287                if (pevent->data_size) {
01288                   if (eq->buffer_handle) {
01289                      /* send first event to ODB if logger writes in root format */
01290                      if (pevent->serial_number == 1)
01291                         if (logger_root())
01292                            update_odb(pevent, eq->hkey_variables, eq->format);
01293 
01294 #ifdef USE_EVENT_CHANNEL
01295                      dm_pointer_increment(eq->buffer_handle,
01296                                           pevent->data_size + sizeof(EVENT_HEADER));
01297 #else
01298                      status = rpc_send_event(eq->buffer_handle, pevent,
01299                                              pevent->data_size + sizeof(EVENT_HEADER),
01300                                              SYNC);
01301 
01302                      if (status != SUCCESS) {
01303                         cm_msg(MERROR, "scheduler", "rpc_send_event error %d", status);
01304                         goto net_error;
01305                      }
01306 #endif
01307 
01308                      eq->bytes_sent += pevent->data_size + sizeof(EVENT_HEADER);
01309 
01310                      if (eq->info.num_subevents)
01311                         eq->events_sent += eq->subevent_number;
01312                      else
01313                         eq->events_sent++;
01314                   }
01315                }
01316 
01317                actual_millitime = ss_millitime();
01318 
01319                /* repeat no more than period */
01320                if (actual_millitime - readout_start > (DWORD) eq_info->period)
01321                   break;
01322 
01323                /* quit if event limit is reached */
01324                if (eq_info->event_limit > 0 &&
01325                    eq->stats.events_sent + eq->events_sent >= eq_info->event_limit)
01326                   break;
01327             }
01328 
01329             /* send event to ODB */
01330             if (pevent && (eq_info->read_on & RO_ODB || eq_info->history)) {
01331                if (actual_millitime - eq->last_called > ODB_UPDATE_TIME && pevent != NULL) {
01332                   eq->last_called = actual_millitime;
01333                   update_odb(pevent, eq->hkey_variables, eq->format);
01334                   eq->odb_out++;
01335                }
01336             }
01337          }
01338 
01339          /*---- send interrupt events ----*/
01340          if (eq_info->eq_type & EQ_INTERRUPT) {
01341             /* not much to do as work being done independently in interrupt_routine() */
01342 
01343             /* update ODB */
01344             if (interrupt_odb_buffer_valid) {
01345                update_odb(interrupt_odb_buffer, interrupt_eq->hkey_variables,
01346                           interrupt_eq->format);
01347                interrupt_odb_buffer_valid = FALSE;
01348             }
01349 
01350          }
01351 
01352          /*---- check if event limit is reached ----*/
01353          if (eq_info->eq_type != EQ_SLOW &&
01354              eq_info->event_limit > 0 &&
01355              eq->stats.events_sent + eq->events_sent >= eq_info->event_limit &&
01356              run_state == STATE_RUNNING) {
01357             /* stop run */
01358             if (cm_transition(TR_STOP, 0, str, sizeof(str), SYNC, FALSE) != CM_SUCCESS)
01359                cm_msg(MERROR, "scheduler", "cannot stop run: %s", str);
01360 
01361             /* check if autorestart, main loop will take care of it */
01362             size = sizeof(BOOL);
01363             flag = FALSE;
01364             db_get_value(hDB, 0, "/Logger/Auto restart", &flag, &size, TID_BOOL, TRUE);
01365 
01366             if (flag)
01367                auto_restart = ss_time() + 20;   /* restart in 20 sec. */
01368 
01369             /* update event display correctly */
01370             force_update = TRUE;
01371          }
01372       }
01373 
01374       /*---- call frontend_loop periodically -------------------------*/
01375       if (frontend_call_loop) {
01376          status = frontend_loop();
01377          if (status == RPC_SHUTDOWN || status == SS_ABORT) {
01378             status = RPC_SHUTDOWN;
01379             break;
01380          }
01381       }
01382 
01383       /*---- check for deferred transitions --------------------------*/
01384       cm_check_deferred_transition();
01385 
01386       /*---- check for manual triggered events -----------------------*/
01387       if (manual_trigger_event_id) {
01388          interrupt_enable(FALSE);
01389 
01390          /* readout and send event */
01391          status = BM_INVALID_PARAM;
01392          for (i = 0; equipment[i].name[0]; i++)
01393             if (equipment[i].info.event_id == manual_trigger_event_id) {
01394                status = send_event(i);
01395                break;
01396             }
01397 
01398          manual_trigger_event_id = 0;
01399 
01400          if (status != CM_SUCCESS) {
01401             cm_msg(MERROR, "scheduler", "send_event error %d", status);
01402             goto net_error;
01403          }
01404 
01405          /* re-enable the interrupt after periodic */
01406          interrupt_enable(TRUE);
01407       }
01408 
01409       /*---- calculate rates and update status page periodically -----*/
01410       if (force_update ||
01411           (display_period
01412            && actual_millitime - last_time_display > (DWORD) display_period)
01413           || (!display_period && actual_millitime - last_time_display > 3000)) {
01414          force_update = FALSE;
01415 
01416          /* calculate rates */
01417          if (actual_millitime != last_time_display) {
01418             max_bytes_per_sec = 0;
01419             for (i = 0; equipment[i].name[0]; i++) {
01420                eq = &equipment[i];
01421                eq->stats.events_sent += eq->events_sent;
01422                eq->stats.events_per_sec =
01423                    eq->events_sent / ((actual_millitime - last_time_display) / 1000.0);
01424                eq->stats.kbytes_per_sec =
01425                    eq->bytes_sent / 1024.0 / ((actual_millitime - last_time_display) /
01426                                               1000.0);
01427 
01428                if ((INT) eq->bytes_sent > max_bytes_per_sec)
01429                   max_bytes_per_sec = eq->bytes_sent;
01430 
01431                eq->bytes_sent = 0;
01432                eq->events_sent = 0;
01433             }
01434 
01435             max_bytes_per_sec = (DWORD)
01436                 ((double) max_bytes_per_sec /
01437                  ((actual_millitime - last_time_display) / 1000.0));
01438 
01439             /* tcp buffer size evaluation */
01440             if (optimize) {
01441                opt_max = MAX(opt_max, (INT) max_bytes_per_sec);
01442                ss_printf(0, opt_index, "%6d : %5.1lf %5.1lf", opt_tcp_size,
01443                          opt_max / 1024.0, max_bytes_per_sec / 1024.0);
01444                if (++opt_cnt == 10) {
01445                   opt_cnt = 0;
01446                   opt_max = 0;
01447                   opt_index++;
01448                   opt_tcp_size = 1 << (opt_index + 7);
01449                   rpc_set_opt_tcp_size(opt_tcp_size);
01450                   if (1 << (opt_index + 7) > 0x8000) {
01451                      opt_index = 0;
01452                      opt_tcp_size = 1 << 7;
01453                      rpc_set_opt_tcp_size(opt_tcp_size);
01454                   }
01455                }
01456             }
01457 
01458          }
01459 
01460          /* propagate changes in equipment to ODB */
01461          rpc_set_option(-1, RPC_OTRANSPORT, RPC_FTCP);
01462          db_send_changed_records();
01463          rpc_set_option(-1, RPC_OTRANSPORT, RPC_TCP);
01464 
01465          if (display_period) {
01466             display(FALSE);
01467 
01468             /* check keyboard */
01469             ch = 0;
01470             status = 0;
01471             while (ss_kbhit()) {
01472                ch = ss_getchar(0);
01473                if (ch == -1)
01474                   ch = getchar();
01475 
01476                if (ch == '!')
01477                   status = RPC_SHUTDOWN;
01478             }
01479 
01480             if (ch > 0)
01481                display(TRUE);
01482             if (status == RPC_SHUTDOWN)
01483                break;
01484          }
01485 
01486          last_time_display = actual_millitime;
01487       }
01488 
01489       /*---- check to flush cache ------------------------------------*/
01490       if (actual_millitime - last_time_flush > 1000) {
01491          last_time_flush = actual_millitime;
01492 
01493          /* if cache on server is not filled in one second at current
01494             data rate, flush it now to make events available to consumers */
01495 
01496          if (max_bytes_per_sec < SERVER_CACHE_SIZE) {
01497             interrupt_enable(FALSE);
01498 
01499 #ifdef USE_EVENT_CHANNEL
01500             if ((status = dm_area_flush()) != CM_SUCCESS)
01501                cm_msg(MERROR, "scheduler", "dm_area_flush: %i", status);
01502 #endif
01503 
01504             for (i = 0; equipment[i].name[0]; i++) {
01505                if (equipment[i].buffer_handle) {
01506                   /* check if buffer already flushed */
01507                   buffer_done = FALSE;
01508                   for (j = 0; j < i; j++)
01509                      if (equipment[i].buffer_handle == equipment[j].buffer_handle) {
01510                         buffer_done = TRUE;
01511                         break;
01512                      }
01513 
01514                   if (!buffer_done) {
01515                      rpc_set_option(-1, RPC_OTRANSPORT, RPC_FTCP);
01516                      rpc_flush_event();
01517                      err = bm_flush_cache(equipment[i].buffer_handle, ASYNC);
01518                      if (err != BM_SUCCESS) {
01519                         cm_msg(MERROR, "scheduler", "bm_flush_cache(ASYNC) error %d",
01520                                err);
01521                         return err;
01522                      }
01523                      rpc_set_option(-1, RPC_OTRANSPORT, RPC_TCP);
01524                   }
01525                }
01526             }
01527             interrupt_enable(TRUE);
01528          }
01529       }
01530 
01531       /*---- check for auto restart --------------------------------*/
01532       if (auto_restart > 0 && ss_time() > auto_restart) {
01533          /* check if really stopped */
01534          size = sizeof(state);
01535          status = db_get_value(hDB, 0, "Runinfo/State", &state, &size, TID_INT, TRUE);
01536          if (status != DB_SUCCESS)
01537             cm_msg(MERROR, "scheduler", "cannot get Runinfo/State in database");
01538 
01539          if (state == STATE_STOPPED) {
01540             auto_restart = 0;
01541             size = sizeof(run_number);
01542             status =
01543                 db_get_value(hDB, 0, "/Runinfo/Run number", &run_number, &size, TID_INT,
01544                              TRUE);
01545             assert(status == SUCCESS);
01546 
01547             if (run_number <= 0) {
01548                cm_msg(MERROR, "main", "aborting on attempt to use invalid run number %d",
01549                       run_number);
01550                abort();
01551             }
01552 
01553             cm_msg(MTALK, "main", "starting new run");
01554             status = cm_transition(TR_START, run_number + 1, NULL, 0, SYNC, FALSE);
01555             if (status != CM_SUCCESS)
01556                cm_msg(MERROR, "main", "cannot restart run");
01557          }
01558       }
01559 
01560       /*---- check network messages ----------------------------------*/
01561       if ((run_state == STATE_RUNNING && interrupt_eq == NULL) || slowcont_eq) {
01562          /* only call yield once every 100ms when running */
01563          if (actual_millitime - last_time_network > 100) {
01564             status = cm_yield(0);
01565             last_time_network = actual_millitime;
01566          } else
01567             status = RPC_SUCCESS;
01568       } else
01569          /* when run is stopped or interrupts used, 
01570             call yield with 100ms timeout */
01571          status = cm_yield(100);
01572 
01573       /* exit for VxWorks */
01574       if (fe_stop)
01575          status = RPC_SHUTDOWN;
01576 
01577       /* exit if CTRL-C pressed */
01578       if (cm_is_ctrlc_pressed())
01579          status = RPC_SHUTDOWN;
01580 
01581    } while (status != RPC_SHUTDOWN && status != SS_ABORT);
01582 
01583  net_error:
01584 
01585    return status;
01586 }
01587 
01588 /*------------------------------------------------------------------*/
01589 
01590 INT get_frontend_index()
01591 {
01592    return frontend_index;
01593 }
01594 
01595 /*------------------------------------------------------------------*/
01596 
01597 #ifdef OS_VXWORKS
01598 int mfe(char *ahost_name, char *aexp_name, BOOL adebug)
01599 #else
01600 int main(int argc, char *argv[])
01601 #endif
01602 {
01603    INT status, i, dm_size;
01604    INT daemon;
01605 
01606    host_name[0] = 0;
01607    exp_name[0] = 0;
01608    debug = FALSE;
01609    daemon = 0;
01610 
01611    setbuf(stdout, 0);
01612    setbuf(stderr, 0);
01613 
01614 #ifdef SIGPIPE
01615    signal(SIGPIPE, SIG_IGN);
01616 #endif
01617 
01618 #ifdef OS_VXWORKS
01619    if (ahost_name)
01620       strcpy(host_name, ahost_name);
01621    if (aexp_name)
01622       strcpy(exp_name, aexp_name);
01623    debug = adebug;
01624 #else
01625 
01626    /* get default from environment */
01627    cm_get_environment(host_name, sizeof(host_name), exp_name, sizeof(exp_name));
01628 
01629    /* parse command line parameters */
01630    for (i = 1; i < argc; i++) {
01631       if (argv[i][0] == '-' && argv[i][1] == 'd')
01632          debug = TRUE;
01633       else if (argv[i][0] == '-' && argv[i][1] == 'D')
01634          daemon = 1;
01635       else if (argv[i][0] == '-' && argv[i][1] == 'O')
01636          daemon = 2;
01637       else if (argv[i][0] == '-') {
01638          if (i + 1 >= argc || argv[i + 1][0] == '-')
01639             goto usage;
01640          if (argv[i][1] == 'e')
01641             strcpy(exp_name, argv[++i]);
01642          else if (argv[i][1] == 'h')
01643             strcpy(host_name, argv[++i]);
01644          else if (argv[i][1] == 'i')
01645             frontend_index = atoi(argv[++i]);
01646          else {
01647           usage:
01648             printf
01649                 ("usage: frontend [-h Hostname] [-e Experiment] [-d] [-D] [-O] [-i n]\n");
01650             printf("         [-d]     Used to debug the frontend\n");
01651             printf("         [-D]     Become a daemon\n");
01652             printf("         [-O]     Become a daemon but keep stdout\n");
01653             printf("         [-i n]   Set frontend index (used for event building)\n");
01654             return 0;
01655          }
01656       }
01657    }
01658 #endif
01659 
01660    /* check event and buffer sizes */
01661    if (event_buffer_size < 2 * max_event_size) {
01662       printf("event_buffer_size too small for max. event size\n");
01663       ss_sleep(5000);
01664       return 1;
01665    }
01666 
01667    if (max_event_size > MAX_EVENT_SIZE) {
01668       printf("Requested max_event_size (%d) exceeds max. system event size (%d)",
01669              max_event_size, MAX_EVENT_SIZE);
01670       ss_sleep(5000);
01671       return 1;
01672    }
01673 
01674    dm_size = event_buffer_size;
01675 
01676 #ifdef OS_VXWORKS
01677    /* override dm_size in case of VxWorks
01678       take remaining free memory and use 20% of it for dm_ */
01679    dm_size = 2 * 10 * (max_event_size + sizeof(EVENT_HEADER) + sizeof(INT));
01680    if (dm_size > memFindMax()) {
01681       cm_msg(MERROR, "mainFE", "Not enough mem space for event size");
01682       return 0;
01683    }
01684    /* takes overall 20% of the available memory resource for dm_() */
01685    dm_size = 0.2 * memFindMax();
01686 
01687    /* there are two buffers */
01688    dm_size /= 2;
01689 #endif
01690 
01691    /* reduce memory size for MS-DOS */
01692 #ifdef OS_MSDOS
01693    if (dm_size > 0x4000)
01694       dm_size = 0x4000;         /* 16k */
01695 #endif
01696 
01697    /* add frontend index to frontend name if present */
01698    strcpy(full_frontend_name, frontend_name);
01699    if (frontend_index >= 0)
01700       sprintf(full_frontend_name + strlen(full_frontend_name), "%02d", frontend_index);
01701 
01702    /* inform user of settings */
01703    printf("Frontend name          :     %s\n", full_frontend_name);
01704    printf("Event buffer size      :     %d\n", event_buffer_size);
01705    printf("Buffer allocation      : 2 x %d\n", dm_size);
01706    printf("System max event size  :     %d\n", MAX_EVENT_SIZE);
01707    printf("User max event size    :     %d\n", max_event_size);
01708    if (max_event_size_frag > 0)
01709       printf("User max frag. size    :     %d\n", max_event_size_frag);
01710    printf("# of events per buffer :     %d\n\n", dm_size / max_event_size);
01711 
01712    if (daemon) {
01713       printf("\nBecoming a daemon...\n");
01714       ss_daemon_init(daemon == 2);
01715    }
01716 
01717    /* now connect to server */
01718    if (display_period) {
01719       if (host_name[0])
01720          printf("Connect to experiment %s on host %s...", exp_name, host_name);
01721       else
01722          printf("Connect to experiment %s...", exp_name);
01723    }
01724 
01725    status = cm_connect_experiment1(host_name, exp_name, full_frontend_name,
01726                                    NULL, DEFAULT_ODB_SIZE, DEFAULT_FE_TIMEOUT);
01727    if (status != CM_SUCCESS) {
01728       /* let user read message before window might close */
01729       ss_sleep(5000);
01730       return 1;
01731    }
01732 
01733    if (display_period)
01734       printf("OK\n");
01735 
01736    /* book buffer space */
01737    status = dm_buffer_create(dm_size, max_event_size);
01738    if (status != CM_SUCCESS) {
01739       printf("dm_buffer_create: Not enough memory or event too big\n");
01740       return 1;
01741    }
01742 
01743    /* remomve any dead frontend */
01744    cm_cleanup(full_frontend_name, FALSE);
01745 
01746    /* shutdown previous frontend */
01747    status = cm_shutdown(full_frontend_name, FALSE);
01748    if (status == CM_SUCCESS && display_period) {
01749       printf("Previous frontend stopped\n");
01750 
01751       /* let user read message */
01752       ss_sleep(3000);
01753    }
01754 
01755    /* register transition callbacks */
01756    if (cm_register_transition(TR_START, tr_start, 500) != CM_SUCCESS ||
01757        cm_register_transition(TR_STOP, tr_stop, 500) != CM_SUCCESS ||
01758        cm_register_transition(TR_PAUSE, tr_pause, 500) != CM_SUCCESS ||
01759        cm_register_transition(TR_RESUME, tr_resume, 500) != CM_SUCCESS) {
01760       printf("Failed to start local RPC server");
01761       cm_disconnect_experiment();
01762       dm_buffer_release();
01763 
01764       /* let user read message before window might close */
01765       ss_sleep(5000);
01766       return 1;
01767    }
01768 
01769    cm_get_experiment_database(&hDB, &status);
01770 
01771    /* set time from server */
01772 #ifdef OS_VXWORKS
01773    cm_synchronize(NULL);
01774 #endif
01775 
01776    /* turn off watchdog if in debug mode */
01777    if (debug)
01778       cm_set_watchdog_params(TRUE, 0);
01779 
01780    /* increase RPC timeout to 2min for logger with exabyte or blocked disk */
01781    rpc_set_option(-1, RPC_OTIMEOUT, 120000);
01782 
01783    /* set own message print function */
01784    if (display_period)
01785       cm_set_msg_print(MT_ALL, MT_ALL, message_print);
01786 
01787    /* call user init function */
01788    if (display_period)
01789       printf("Init hardware...");
01790    if (frontend_init() != SUCCESS) {
01791       if (display_period)
01792          printf("\n");
01793       cm_disconnect_experiment();
01794       dm_buffer_release();
01795 
01796       /* let user read message before window might close */
01797       ss_sleep(5000);
01798       return 1;
01799    }
01800 
01801    /* reqister equipment in ODB */
01802    if (register_equipment() != SUCCESS) {
01803       if (display_period)
01804          printf("\n");
01805       cm_disconnect_experiment();
01806       dm_buffer_release();
01807 
01808       /* let user read message before window might close */
01809       ss_sleep(5000);
01810       return 1;
01811    }
01812 
01813    if (display_period)
01814       printf("OK\n");
01815 
01816    /* initialize screen display */
01817    if (display_period) {
01818       ss_sleep(1000);
01819       display(TRUE);
01820    }
01821 
01822    /* switch on interrupts if running */
01823    if (interrupt_eq && run_state == STATE_RUNNING)
01824       interrupt_enable(TRUE);
01825 
01826    /* initialize ss_getchar */
01827    ss_getchar(0);
01828 
01829    /* call main scheduler loop */
01830    status = scheduler();
01831 
01832    /* reset terminal */
01833    ss_getchar(TRUE);
01834 
01835    /* switch off interrupts */
01836    if (interrupt_eq) {
01837       interrupt_configure(CMD_INTERRUPT_DISABLE, 0, 0);
01838       interrupt_configure(CMD_INTERRUPT_DETACH, 0, 0);
01839       if (interrupt_odb_buffer)
01840          free(interrupt_odb_buffer);
01841    }
01842 
01843    /* detach interrupts */
01844    if (interrupt_eq != NULL)
01845       interrupt_configure(CMD_INTERRUPT_DETACH, interrupt_eq->info.source, 0);
01846 
01847    /* call user exit function */
01848    frontend_exit();
01849 
01850    /* close slow control drivers */
01851    for (i = 0; equipment[i].name[0]; i++)
01852       if ((equipment[i].info.eq_type & EQ_SLOW) && equipment[i].status == FE_SUCCESS)
01853          equipment[i].cd(CMD_EXIT, &equipment[i]);
01854 
01855    /* close network connection to server */
01856    cm_disconnect_experiment();
01857 
01858    if (display_period) {
01859       if (status == RPC_SHUTDOWN) {
01860          ss_clear_screen();
01861          ss_printf(0, 0, "Frontend shut down.");
01862          ss_printf(0, 1, "");
01863       }
01864    }
01865 
01866    if (status != RPC_SHUTDOWN)
01867       printf("Network connection aborted.\n");
01868 
01869    dm_buffer_release();
01870 
01871    return 0;
01872 }

Midas DOC Version 1.9.5 ---- PSI Stefan Ritt ----
Contributions: Pierre-Andre Amaudruz - Sergio Ballestrero - Suzannah Daviel - Doxygen - Peter Green - Qing Gu - Greg Hackman - Gertjan Hofman - Paul Knowles - Rudi Meier - Glenn Moloney - Dave Morris - John M O'Donnell - Konstantin Olchanski - Renee Poutissou - Tamsen Schurman - Andreas Suter - Jan M.Wouters - Piotr Adam Zolnierczuk