nds2-client - ClientUser  0.16.8
 All Classes Namespaces Functions Variables Typedefs Enumerations Enumerator Pages
nds_connection_ptype.hh
1 #ifndef NDS_CONNECTION_DETAIL_PTYPE_HH
2 #define NDS_CONNECTION_DETAIL_PTYPE_HH
3 
4 #include <cerrno>
5 #include <map>
6 #include <memory>
7 #include <sstream>
8 #include <vector>
9 
10 #include "debug_stream.hh"
11 
12 #include "daq_config.h"
13 #include "daqc.h"
14 #include "daqc_internal.h"
15 #include "daqc_response.h"
16 
17 #include "nds_channel.hh"
18 #include "nds_buffer.hh"
19 
20 #include "nds_gap_handler.hh"
21 #include "nds_request_fragment.hh"
22 #include "nds_iterate_handler.hh"
23 
24 #include "nds_parameter_block.hh"
25 
26 #include "nds_channel_cache.hh"
27 
28 namespace NDS
29 {
30  namespace detail
31  {
32 
37  class basic_channel_filter
38  {
39  public:
40  basic_channel_filter( channel::data_type data_type_mask,
41  channel::sample_rate_type min_sample_rate,
42  channel::sample_rate_type max_sample_rate )
43  : data_type_mask_( data_type_mask ),
44  min_sample_rate_( min_sample_rate ),
45  max_sample_rate_( max_sample_rate )
46  {
47  }
48  bool
49  operator( )( const channel& ch )
50  {
51  return (
52  ( ( ( ch.DataType( ) & data_type_mask_ ) != 0 ) ||
53  data_type_mask_ == NDS::channel::DEFAULT_DATA_MASK ) &&
54  ( ch.SampleRate( ) <= max_sample_rate_ ) &&
55  ( ch.SampleRate( ) >= min_sample_rate_ ) );
56  }
57 
58  private:
59  channel::data_type data_type_mask_;
60  channel::sample_rate_type min_sample_rate_;
61  channel::sample_rate_type max_sample_rate_;
62  };
63 
67  class push_back_channel
68  {
69  public:
70  push_back_channel( std::vector< channel >& buffer )
71  : buffer_( buffer )
72  {
73  }
74  void
75  operator( )( const channel& ch )
76  {
77  buffer_.push_back( ch );
78  }
79 
80  private:
81  std::vector< channel >& buffer_;
82  };
83 
87  class count_channels
88  {
89  public:
90  count_channels( ) : count_( 0 )
91  {
92  }
93  void
94  operator( )( const channel& ch )
95  {
96  ++count_;
97  }
98 
99  size_t
100  count( ) const
101  {
102  return count_;
103  }
104 
105  private:
106  size_t count_;
107  };
108  }
109 
110  namespace detail
111  {
112 
113  class buffer_initializer
114  {
115  public:
116  DLL_EXPORT
117  buffer_initializer( buffer::gps_second_type gps_start,
118  buffer::gps_second_type gps_stop )
119  : gps_start( gps_start ), gps_stop( gps_stop ){};
120 
121  DLL_EXPORT
122  void reset_buffer( buffer* cur_buffer,
123  const channel& channel_info ) const;
124 
125  private:
126  buffer::gps_second_type gps_start;
127  buffer::gps_second_type gps_stop;
128  };
129 
130  class daq_accessor
131  {
132  public:
133  daq_accessor( daq_t& handle ) : handle_( handle ){};
134 
135  daq_t*
136  operator( )( )
137  {
138  return &handle_;
139  };
140 
141  private:
142  daq_t& handle_;
143  };
144 
145  //---------------------------------------------------------------------
146  // conn_p_type
147  //---------------------------------------------------------------------
148  struct conn_p_type : public std::enable_shared_from_this< conn_p_type >
149  {
150  enum class iterate_finalize_reason
151  {
152  FINISHED,
153  ABORTED
154  };
155 
156  typedef channel::channel_type channel_type;
157  typedef channel::data_type data_type;
158  typedef channel::sample_rate_type sample_rate_type;
159 
160  typedef std::map< std::string, daq_channel_t >
161  channel_mem_cache_type;
162  typedef NDS::detail::channel_cache_nds1 channel_cache_type;
163  typedef long time_type;
164 
165  typedef nds_socket_type socket_t;
166 
167  connection::host_type host;
168  connection::port_type port;
169  connection::protocol_type protocol;
170  daq_t handle;
171  daq_accessor accesor_;
172  bool connected;
173  channel_cache_type channel_cache_;
174  channel_mem_cache_type channel_mem_cache_;
175 
176  time_type request_start_time_;
177  time_type request_end_time_;
178  bool request_in_progress_;
179 
180  NDS::parameters parameters_;
181 
182  DLL_EXPORT
183  explicit conn_p_type( const NDS::parameters& params );
184 
185  DLL_EXPORT
186  ~conn_p_type( );
187 
188  void connect( );
189 
190  availability_list_type get_availability(
191  const epoch& time_span,
192  const connection::channel_names_type& channel_names );
193  availability_list_type get_availability(
194  buffer::gps_second_type gps_start,
195  buffer::gps_second_type gps_stop,
196  const connection::channel_names_type& channel_names );
197 
198  bool check( buffer::gps_second_type gps_start,
199  buffer::gps_second_type gps_stop,
200  const connection::channel_names_type& channel_names );
201 
202  bool
203  has_gaps( buffer::gps_second_type gps_start,
204  buffer::gps_second_type gps_stop,
205  const connection::channel_names_type& channel_names );
206 
207  buffers_type
208  fetch( buffer::gps_second_type gps_start,
209  buffer::gps_second_type gps_stop,
210  const connection::channel_names_type& channel_names,
211  channels_type* reference_channels = nullptr );
212 
213  size_t count_channels_nds1(
214  std::string channel_glob,
215  channel_type channel_type_mask = channel::DEFAULT_CHANNEL_MASK,
216  data_type data_type_mask = channel::DEFAULT_DATA_MASK,
217  sample_rate_type min_sample_rate = channel::MIN_SAMPLE_RATE,
218  sample_rate_type max_sample_rate = channel::MAX_SAMPLE_RATE );
219 
220  size_t count_channels_nds2(
221  std::string channel_glob,
222  channel_type channel_type_mask = channel::DEFAULT_CHANNEL_MASK,
223  data_type data_type_mask = channel::DEFAULT_DATA_MASK,
224  sample_rate_type min_sample_rate = channel::MIN_SAMPLE_RATE,
225  sample_rate_type max_sample_rate = channel::MAX_SAMPLE_RATE );
226 
227  size_t
228  count_channels(
229  std::string channel_glob,
230  channel_type channel_type_mask = channel::DEFAULT_CHANNEL_MASK,
231  data_type data_type_mask = channel::DEFAULT_DATA_MASK,
232  sample_rate_type min_sample_rate = channel::MIN_SAMPLE_RATE,
233  sample_rate_type max_sample_rate = channel::MAX_SAMPLE_RATE )
234  {
235  if ( protocol == connection::protocol_type::PROTOCOL_ONE )
236  {
237  return count_channels_nds1( channel_glob,
238  channel_type_mask,
239  data_type_mask,
240  min_sample_rate,
241  max_sample_rate );
242  }
243  return count_channels_nds2( channel_glob,
244  channel_type_mask,
245  data_type_mask,
246  min_sample_rate,
247  max_sample_rate );
248  }
249 
250  void find_channels( channels_type& output,
251  const NDS::channel_predicate_object& pred );
252 
253  void find_channels_nds1(
254  channels_type& output,
255  std::string channel_glob,
256  channel_type channel_type_mask = channel::DEFAULT_CHANNEL_MASK,
257  data_type data_type_mask = channel::DEFAULT_DATA_MASK,
258  sample_rate_type min_sample_rate = channel::MIN_SAMPLE_RATE,
259  sample_rate_type max_sample_rate = channel::MAX_SAMPLE_RATE );
260 
261  void find_channels_nds2(
262  channels_type& output,
263  std::string channel_glob,
264  channel_type channel_type_mask = channel::DEFAULT_CHANNEL_MASK,
265  data_type data_type_mask = channel::DEFAULT_DATA_MASK,
266  sample_rate_type min_sample_rate = channel::MIN_SAMPLE_RATE,
267  sample_rate_type max_sample_rate = channel::MAX_SAMPLE_RATE );
268 
269  epochs_type get_epochs( );
270 
271  bool set_epoch_if_changed( const epoch& time_span );
272  bool set_epoch( const std::string& epoch_name );
273 
274  bool set_epoch( buffer::gps_second_type gps_start,
275  buffer::gps_second_type gps_stop );
276 
277  epoch current_epoch( ) const;
278 
279  const channel::hash_type& hash( ) const;
280 
281  std::shared_ptr< detail::iterate_handler > dispatch_iterate(
282  buffer::gps_second_type gps_start,
283  buffer::gps_second_type gps_stop,
284  buffer::gps_second_type stride,
285  const connection::channel_names_type& channel_names );
286 
287  void issue_iterate(
288  buffer::gps_second_type gps_start,
289  buffer::gps_second_type gps_stop,
290  buffer::gps_second_type stride,
291  const connection::channel_names_type& channel_names,
292  std::vector< NDS::channel >* final_channel_list = nullptr );
293 
294  void finalize_iterate( detail::iterate_handler* handler,
295  iterate_finalize_reason reason );
296 
297  // bool has_next();
298  //
299  // buffers_type next();
300 
301  void next_raw_buffer( buffers_type& output );
302 
303  void shutdown( );
304 
305  void fill_gap( data_type DataType,
306  channel::size_type DataSizeType,
307  unsigned char* start,
308  unsigned char* end );
309 
310  inline time_type
311  request_start_time( ) const
312  {
313  return request_start_time_;
314  }
315 
316  inline void
317  request_start_time( time_type Value )
318  {
319  request_start_time_ = Value;
320  }
321 
322  inline time_type
323  request_end_time( ) const
324  {
325  return request_end_time_;
326  }
327 
328  inline void
329  request_end_time( time_type Value )
330  {
331  request_end_time_ = Value;
332  }
333 
334  inline bool
335  request_in_progress( ) const
336  {
337  return request_in_progress_;
338  }
339 
340  inline void
341  request_in_progress( bool Value )
342  {
343  request_in_progress_ = Value;
344  }
345 
353  NDS::buffer::gps_second_type
354  calculate_stride( NDS::buffer::gps_second_type gps_start,
355  NDS::buffer::gps_second_type gps_stop,
356  NDS::channels_type& selected_channels ) const;
357 
358  inline void
359  termination_block( )
360  {
361  //-----------------------------------------------------------------
362  // NDS1 transfers end with a 'termination block', an empty block
363  // that is indistinguisable from a 'data not found' condition.
364  // If this is an NDS1 connection, we must digest the termination
365  // block.
366  //-----------------------------------------------------------------
367  if ( ( request_in_progress( ) ) &&
368  ( protocol == connection::protocol_type::PROTOCOL_ONE ) )
369  {
370  int rc = daq_recv_next( &handle );
371 
372  if ( rc != DAQD_NOT_FOUND )
373  {
374  //-----------------------------------------------------
375  // Unexpected error
376  //-----------------------------------------------------
377  throw connection::daq_error( rc );
378  }
379  }
380  }
381 
386  NDS::buffer::gps_second_type cur_nds1_gpstime( );
387 
388  void validate( ) const;
389 
390  void validate_daq( int RetCode ) const;
391 
392  void infer_daq_channel_info( const std::string& channel_name,
393  daq_channel_t& channel,
394  time_type gps );
395 
396  void channel_mask_to_query_type_strings(
397  channel_type channel_type_mask,
398  std::vector< std::string >& queryTypes );
399 
400  // Helper functions for error messages
401  std::string get_last_message( ) const throw( );
402 
403  std::string err_msg_unexpected_no_data_found(
404  buffer::gps_second_type gps_start,
405  buffer::gps_second_type gps_stop,
406  const channel::channel_names_type& names );
407 
408  void
409  plan_fetches( buffer::gps_second_type gps_start,
410  buffer::gps_second_type gps_stop,
411  const connection::channel_names_type& channel_names,
412  buffers_type& dest_buffers,
413  request_fragments_type& retval );
414 
415  void
416  sync_parameters( )
417  {
418  detail::parameter_accessor paccess( parameters_ );
419  if ( handle.conceal )
420  {
421  handle.conceal->max_command_count = static_cast< size_t >(
422  paccess( ).max_nds1_command_count( ) );
423  }
424  }
425 
426  void cycle_nds1_connection( );
427 
428  protected:
429  epoch resolve_epoch( const std::string& epoch_name );
430  void setup_daq_chanlist(
431  buffer::gps_second_type gps_start,
432  const connection::channel_names_type& channel_names,
433  bool& have_minute_trends,
434  double& bytes_per_sample,
435  std::vector< NDS::channel >* final_channel_list = nullptr );
436 
437  void process_check_data_result( int result, bool gaps_ok );
438 
439  void fetch_fragment( request_fragment& fragment,
440  const buffer_initializer& initializer,
441  bool buffers_initialized );
442 
443  std::vector< buffers_type > fetch_available(
444  buffer::gps_second_type gps_start,
445  buffer::gps_second_type gps_stop,
446  const connection::channel_names_type& channel_names );
447 
448  std::weak_ptr< detail::iterate_handler > iterate_handler_;
449 
450  epochs_type epoch_cache_;
451  epoch current_epoch_;
452  channel::hash_type hash_;
453 
454  void load_epochs_to_cache( );
455 
456  channel _parse_nds2_get_channel_line( char* buffer );
457 
458  bool _read_uint4( socket_t fd, uint4_type* dest );
459 
460  bool _read_buffer( socket_t fd, void* dest, size_t size );
461 
462  template < typename Filter, typename Function >
463  void retreive_channels_from_nds2(
464  const std::vector< std::string >& types,
465  const std::string& channel_glob,
466  Filter& filt,
467  Function& fn );
468 
469  std::shared_ptr< detail::iterate_handler > iterate_simple_gaps(
470  buffer::gps_second_type gps_start,
471  buffer::gps_second_type gps_stop,
472  buffer::gps_second_type stride,
473  const connection::channel_names_type& channel_names );
474 
475  std::shared_ptr< detail::iterate_handler >
476  iterate_fast( buffer::gps_second_type gps_start,
477  buffer::gps_second_type gps_stop,
478  buffer::gps_second_type stride,
479  const connection::channel_names_type& channel_names );
480 
481  std::shared_ptr< detail::iterate_handler > iterate_available(
482  buffer::gps_second_type gps_start,
483  buffer::gps_second_type gps_stop,
484  buffer::gps_second_type stride,
485  const connection::channel_names_type& channel_names );
486 
487  std::shared_ptr< detail::iterate_handler >
488  iterate_full( buffer::gps_second_type gps_start,
489  buffer::gps_second_type gps_stop,
490  buffer::gps_second_type stride,
491  const connection::channel_names_type& channel_names );
492 
493  private:
494  static bool initialized;
495  };
496 
497  template < typename Filter, typename Function >
498  void
499  conn_p_type::retreive_channels_from_nds2(
500  const std::vector< std::string >& types,
501  const std::string& channel_glob,
502  Filter& filter,
503  Function& fn )
504  {
505  std::vector< char > lineBuffer;
506  lineBuffer.resize( 512 );
507 
508  for ( std::vector< std::string >::const_iterator cur =
509  types.begin( );
510  cur != types.end( );
511  ++cur )
512  {
513  NDS::detail::dout( ) << "Retreiving channels for type " << *cur
514  << std::endl;
515  {
516  std::ostringstream cmdBuf;
517  // cmdBuf << "` " << current_epoch_.gps_start << " " <<
518  // cur_type;
519  if ( current_epoch_.gps_stop ==
520  current_epoch_.gps_start + 1 )
521  {
522  cmdBuf << "get-channels " << current_epoch_.gps_start
523  << " ";
524  }
525  else
526  {
527  cmdBuf << "get-channels 0 ";
528  }
529  cmdBuf << *cur;
530  if ( channel_glob.compare( "*" ) != 0 )
531  {
532  cmdBuf << " {" << channel_glob << "}";
533  }
534  cmdBuf << ";\n";
535  validate_daq(
536  daq_send( &( handle ), cmdBuf.str( ).c_str( ) ) );
537  NDS::detail::dout( ) << "Sent command '" << cmdBuf.str( )
538  << "'" << std::endl;
539  }
540 
541  socket_t fd = handle.conceal->sockfd;
542  uint4_type count = 0;
543  NDS::detail::dout( ) << "Reading channel count" << std::endl;
544  if ( !_read_uint4( fd, &count ) )
545  {
546  throw connection::unexpected_channels_received_error( );
547  }
548  // result.reserve(count);
549 
550  NDS::detail::dout( ) << "Expecting " << count << "channels"
551  << std::endl;
552  for ( uint4_type i = 0; i < count; ++i )
553  {
554  uint4_type line_size = 0;
555  if ( !_read_uint4( handle.conceal->sockfd, &line_size ) )
556  {
557  throw connection::unexpected_channels_received_error( );
558  }
559  if ( line_size < lineBuffer.size( ) )
560  {
561  lineBuffer.resize( line_size );
562  }
563  if ( !_read_buffer( handle.conceal->sockfd,
564  &( lineBuffer[ 0 ] ),
565  line_size ) )
566  {
567  throw connection::unexpected_channels_received_error( );
568  }
569  channel curCh =
570  _parse_nds2_get_channel_line( &( lineBuffer[ 0 ] ) );
571  if ( filter( curCh ) )
572  {
573  fn( curCh );
574  }
575  }
576  }
577  }
578  } // namespace detail
579 
580 } // namespace NDS
581 
582 #endif // NDS_CONNECTION_DETAIL_PTYPE_HH