1 #ifndef NDS_CONNECTION_DETAIL_PTYPE_HH
2 #define NDS_CONNECTION_DETAIL_PTYPE_HH
10 #include "debug_stream.hh"
12 #include "daq_config.h"
14 #include "daqc_internal.h"
15 #include "daqc_response.h"
17 #include "nds_channel.hh"
18 #include "nds_buffer.hh"
20 #include "nds_gap_handler.hh"
21 #include "nds_request_fragment.hh"
22 #include "nds_iterate_handler.hh"
24 #include "nds_parameter_block.hh"
26 #include "nds_channel_cache.hh"
37 class basic_channel_filter
40 basic_channel_filter( channel::data_type data_type_mask,
41 channel::sample_rate_type min_sample_rate,
42 channel::sample_rate_type max_sample_rate )
43 : data_type_mask_( data_type_mask ),
44 min_sample_rate_( min_sample_rate ),
45 max_sample_rate_( max_sample_rate )
49 operator( )(
const channel& ch )
52 ( ( ( ch.DataType( ) & data_type_mask_ ) != 0 ) ||
53 data_type_mask_ == NDS::channel::DEFAULT_DATA_MASK ) &&
54 ( ch.SampleRate( ) <= max_sample_rate_ ) &&
55 ( ch.SampleRate( ) >= min_sample_rate_ ) );
59 channel::data_type data_type_mask_;
60 channel::sample_rate_type min_sample_rate_;
61 channel::sample_rate_type max_sample_rate_;
67 class push_back_channel
70 push_back_channel( std::vector< channel >& buffer )
75 operator( )(
const channel& ch )
77 buffer_.push_back( ch );
81 std::vector< channel >& buffer_;
90 count_channels( ) : count_( 0 )
94 operator( )(
const channel& ch )
113 class buffer_initializer
117 buffer_initializer( buffer::gps_second_type gps_start,
118 buffer::gps_second_type gps_stop )
119 : gps_start( gps_start ), gps_stop( gps_stop ){};
122 void reset_buffer( buffer* cur_buffer,
123 const channel& channel_info )
const;
126 buffer::gps_second_type gps_start;
127 buffer::gps_second_type gps_stop;
133 daq_accessor( daq_t& handle ) : handle_( handle ){};
148 struct conn_p_type :
public std::enable_shared_from_this< conn_p_type >
150 enum class iterate_finalize_reason
156 typedef channel::channel_type channel_type;
157 typedef channel::data_type data_type;
158 typedef channel::sample_rate_type sample_rate_type;
160 typedef std::map< std::string, daq_channel_t >
161 channel_mem_cache_type;
162 typedef NDS::detail::channel_cache_nds1 channel_cache_type;
163 typedef long time_type;
165 typedef nds_socket_type socket_t;
167 connection::host_type host;
168 connection::port_type port;
169 connection::protocol_type protocol;
171 daq_accessor accesor_;
173 channel_cache_type channel_cache_;
174 channel_mem_cache_type channel_mem_cache_;
176 time_type request_start_time_;
177 time_type request_end_time_;
178 bool request_in_progress_;
180 NDS::parameters parameters_;
183 explicit conn_p_type(
const NDS::parameters& params );
190 availability_list_type get_availability(
191 const epoch& time_span,
192 const connection::channel_names_type& channel_names );
193 availability_list_type get_availability(
194 buffer::gps_second_type gps_start,
195 buffer::gps_second_type gps_stop,
196 const connection::channel_names_type& channel_names );
198 bool check( buffer::gps_second_type gps_start,
199 buffer::gps_second_type gps_stop,
200 const connection::channel_names_type& channel_names );
203 has_gaps( buffer::gps_second_type gps_start,
204 buffer::gps_second_type gps_stop,
205 const connection::channel_names_type& channel_names );
208 fetch( buffer::gps_second_type gps_start,
209 buffer::gps_second_type gps_stop,
210 const connection::channel_names_type& channel_names,
211 channels_type* reference_channels =
nullptr );
213 size_t count_channels_nds1(
214 std::string channel_glob,
215 channel_type channel_type_mask = channel::DEFAULT_CHANNEL_MASK,
216 data_type data_type_mask = channel::DEFAULT_DATA_MASK,
217 sample_rate_type min_sample_rate = channel::MIN_SAMPLE_RATE,
218 sample_rate_type max_sample_rate = channel::MAX_SAMPLE_RATE );
220 size_t count_channels_nds2(
221 std::string channel_glob,
222 channel_type channel_type_mask = channel::DEFAULT_CHANNEL_MASK,
223 data_type data_type_mask = channel::DEFAULT_DATA_MASK,
224 sample_rate_type min_sample_rate = channel::MIN_SAMPLE_RATE,
225 sample_rate_type max_sample_rate = channel::MAX_SAMPLE_RATE );
229 std::string channel_glob,
230 channel_type channel_type_mask = channel::DEFAULT_CHANNEL_MASK,
231 data_type data_type_mask = channel::DEFAULT_DATA_MASK,
232 sample_rate_type min_sample_rate = channel::MIN_SAMPLE_RATE,
233 sample_rate_type max_sample_rate = channel::MAX_SAMPLE_RATE )
235 if ( protocol == connection::protocol_type::PROTOCOL_ONE )
237 return count_channels_nds1( channel_glob,
243 return count_channels_nds2( channel_glob,
250 void find_channels( channels_type& output,
251 const NDS::channel_predicate_object& pred );
253 void find_channels_nds1(
254 channels_type& output,
255 std::string channel_glob,
256 channel_type channel_type_mask = channel::DEFAULT_CHANNEL_MASK,
257 data_type data_type_mask = channel::DEFAULT_DATA_MASK,
258 sample_rate_type min_sample_rate = channel::MIN_SAMPLE_RATE,
259 sample_rate_type max_sample_rate = channel::MAX_SAMPLE_RATE );
261 void find_channels_nds2(
262 channels_type& output,
263 std::string channel_glob,
264 channel_type channel_type_mask = channel::DEFAULT_CHANNEL_MASK,
265 data_type data_type_mask = channel::DEFAULT_DATA_MASK,
266 sample_rate_type min_sample_rate = channel::MIN_SAMPLE_RATE,
267 sample_rate_type max_sample_rate = channel::MAX_SAMPLE_RATE );
269 epochs_type get_epochs( );
271 bool set_epoch_if_changed(
const epoch& time_span );
272 bool set_epoch(
const std::string& epoch_name );
274 bool set_epoch( buffer::gps_second_type gps_start,
275 buffer::gps_second_type gps_stop );
277 epoch current_epoch( )
const;
279 const channel::hash_type& hash( )
const;
281 std::shared_ptr< detail::iterate_handler > dispatch_iterate(
282 buffer::gps_second_type gps_start,
283 buffer::gps_second_type gps_stop,
284 buffer::gps_second_type stride,
285 const connection::channel_names_type& channel_names );
288 buffer::gps_second_type gps_start,
289 buffer::gps_second_type gps_stop,
290 buffer::gps_second_type stride,
291 const connection::channel_names_type& channel_names,
292 std::vector< NDS::channel >* final_channel_list =
nullptr );
294 void finalize_iterate( detail::iterate_handler* handler,
295 iterate_finalize_reason reason );
301 void next_raw_buffer( buffers_type& output );
305 void fill_gap( data_type DataType,
306 channel::size_type DataSizeType,
307 unsigned char* start,
308 unsigned char* end );
311 request_start_time( )
const
313 return request_start_time_;
317 request_start_time( time_type Value )
319 request_start_time_ = Value;
323 request_end_time( )
const
325 return request_end_time_;
329 request_end_time( time_type Value )
331 request_end_time_ = Value;
335 request_in_progress( )
const
337 return request_in_progress_;
341 request_in_progress(
bool Value )
343 request_in_progress_ = Value;
353 NDS::buffer::gps_second_type
354 calculate_stride( NDS::buffer::gps_second_type gps_start,
355 NDS::buffer::gps_second_type gps_stop,
356 NDS::channels_type& selected_channels )
const;
367 if ( ( request_in_progress( ) ) &&
368 ( protocol == connection::protocol_type::PROTOCOL_ONE ) )
370 int rc = daq_recv_next( &handle );
372 if ( rc != DAQD_NOT_FOUND )
377 throw connection::daq_error( rc );
386 NDS::buffer::gps_second_type cur_nds1_gpstime( );
388 void validate( )
const;
390 void validate_daq(
int RetCode )
const;
392 void infer_daq_channel_info(
const std::string& channel_name,
393 daq_channel_t& channel,
396 void channel_mask_to_query_type_strings(
397 channel_type channel_type_mask,
398 std::vector< std::string >& queryTypes );
401 std::string get_last_message( )
const throw( );
403 std::
string err_msg_unexpected_no_data_found(
404 buffer::gps_second_type gps_start,
405 buffer::gps_second_type gps_stop,
406 const channel::channel_names_type& names );
409 plan_fetches( buffer::gps_second_type gps_start,
410 buffer::gps_second_type gps_stop,
411 const connection::channel_names_type& channel_names,
412 buffers_type& dest_buffers,
413 request_fragments_type& retval );
418 detail::parameter_accessor paccess( parameters_ );
419 if ( handle.conceal )
421 handle.conceal->max_command_count =
static_cast< size_t >(
422 paccess( ).max_nds1_command_count( ) );
426 void cycle_nds1_connection( );
429 epoch resolve_epoch(
const std::string& epoch_name );
430 void setup_daq_chanlist(
431 buffer::gps_second_type gps_start,
432 const connection::channel_names_type& channel_names,
433 bool& have_minute_trends,
434 double& bytes_per_sample,
435 std::vector< NDS::channel >* final_channel_list =
nullptr );
437 void process_check_data_result(
int result,
bool gaps_ok );
439 void fetch_fragment( request_fragment& fragment,
440 const buffer_initializer& initializer,
441 bool buffers_initialized );
443 std::vector< buffers_type > fetch_available(
444 buffer::gps_second_type gps_start,
445 buffer::gps_second_type gps_stop,
446 const connection::channel_names_type& channel_names );
448 std::weak_ptr< detail::iterate_handler > iterate_handler_;
450 epochs_type epoch_cache_;
451 epoch current_epoch_;
452 channel::hash_type hash_;
454 void load_epochs_to_cache( );
456 channel _parse_nds2_get_channel_line(
char* buffer );
458 bool _read_uint4( socket_t fd, uint4_type* dest );
460 bool _read_buffer( socket_t fd,
void* dest,
size_t size );
462 template <
typename Filter,
typename Function >
463 void retreive_channels_from_nds2(
464 const std::vector< std::string >& types,
465 const std::string& channel_glob,
469 std::shared_ptr< detail::iterate_handler > iterate_simple_gaps(
470 buffer::gps_second_type gps_start,
471 buffer::gps_second_type gps_stop,
472 buffer::gps_second_type stride,
473 const connection::channel_names_type& channel_names );
475 std::shared_ptr< detail::iterate_handler >
476 iterate_fast( buffer::gps_second_type gps_start,
477 buffer::gps_second_type gps_stop,
478 buffer::gps_second_type stride,
479 const connection::channel_names_type& channel_names );
481 std::shared_ptr< detail::iterate_handler > iterate_available(
482 buffer::gps_second_type gps_start,
483 buffer::gps_second_type gps_stop,
484 buffer::gps_second_type stride,
485 const connection::channel_names_type& channel_names );
487 std::shared_ptr< detail::iterate_handler >
488 iterate_full( buffer::gps_second_type gps_start,
489 buffer::gps_second_type gps_stop,
490 buffer::gps_second_type stride,
491 const connection::channel_names_type& channel_names );
494 static bool initialized;
497 template <
typename Filter,
typename Function >
499 conn_p_type::retreive_channels_from_nds2(
500 const std::vector< std::string >& types,
501 const std::string& channel_glob,
505 std::vector< char > lineBuffer;
506 lineBuffer.resize( 512 );
508 for ( std::vector< std::string >::const_iterator cur =
513 NDS::detail::dout( ) <<
"Retreiving channels for type " << *cur
516 std::ostringstream cmdBuf;
519 if ( current_epoch_.gps_stop ==
520 current_epoch_.gps_start + 1 )
522 cmdBuf <<
"get-channels " << current_epoch_.gps_start
527 cmdBuf <<
"get-channels 0 ";
530 if ( channel_glob.compare(
"*" ) != 0 )
532 cmdBuf <<
" {" << channel_glob <<
"}";
536 daq_send( &( handle ), cmdBuf.str( ).c_str( ) ) );
537 NDS::detail::dout( ) <<
"Sent command '" << cmdBuf.str( )
541 socket_t fd = handle.conceal->sockfd;
542 uint4_type count = 0;
543 NDS::detail::dout( ) <<
"Reading channel count" << std::endl;
544 if ( !_read_uint4( fd, &count ) )
546 throw connection::unexpected_channels_received_error( );
550 NDS::detail::dout( ) <<
"Expecting " << count <<
"channels"
552 for ( uint4_type i = 0; i < count; ++i )
554 uint4_type line_size = 0;
555 if ( !_read_uint4( handle.conceal->sockfd, &line_size ) )
557 throw connection::unexpected_channels_received_error( );
559 if ( line_size < lineBuffer.size( ) )
561 lineBuffer.resize( line_size );
563 if ( !_read_buffer( handle.conceal->sockfd,
564 &( lineBuffer[ 0 ] ),
567 throw connection::unexpected_channels_received_error( );
570 _parse_nds2_get_channel_line( &( lineBuffer[ 0 ] ) );
571 if ( filter( curCh ) )
582 #endif // NDS_CONNECTION_DETAIL_PTYPE_HH