5 #ifndef NDS2_CLIENT_NDS_ITERATE_HANDLERS_HH
6 #define NDS2_CLIENT_NDS_ITERATE_HANDLERS_HH
12 #include "nds_iterate_handler.hh"
27 class iterate_fast_handler :
public detail::iterate_handler
30 buffer::gps_second_type cur_gps_;
31 buffer::gps_second_type gps_start_;
32 buffer::gps_second_type gps_stop_;
33 buffer::gps_second_type stride_;
35 buffers_type next_entry_;
37 template <
typename T >
39 safe_add( T val1, T val2 )
41 static const T max_val = std::numeric_limits< T >::max( );
42 if ( val2 >= max_val - val1 )
50 get_next_block( buffers_type& output )
52 auto parent = conn( );
55 throw std::runtime_error(
"Connection object is null" );
57 parent->next_raw_buffer( output );
58 if ( gps_start_ == 0 )
60 gps_start_ = output.front( ).Start( );
61 gps_stop_ = safe_add< NDS::buffer::gps_second_type >(
63 ( gps_stop_ == 0 ? 1893024016 : gps_stop_ ) );
64 parent->request_end_time( gps_stop_ );
66 if ( !output.empty( ) )
68 cur_gps_ = output.front( ).Stop( );
70 if ( cur_gps_ >= gps_stop_ && online_ )
72 parent->cycle_nds1_connection( );
78 buffer::gps_second_type gps_start,
79 buffer::gps_second_type gps_stop,
80 buffer::gps_second_type stride,
81 const connection::channel_names_type& channel_names,
82 std::shared_ptr< NDS::detail::conn_p_type >&& parent )
83 : iterate_handler( std::move( parent ) ), cur_gps_( 0 ),
84 gps_start_( gps_start ), gps_stop_( gps_stop ),
85 stride_( stride ), online_( gps_start == 0 )
87 buffer::gps_second_type stop_time =
88 ( parent->protocol == NDS::connection::PROTOCOL_ONE &&
92 parent->issue_iterate(
93 gps_start_, stop_time, stride, channel_names );
95 ~iterate_fast_handler( )
override =
default;
100 if ( next_entry_.size( ) > 0 )
104 if ( cur_gps_ >= gps_stop_ && gps_start_ != 0 )
110 get_next_block( next_entry_ );
120 next( buffers_type& output )
override
122 while ( has_next( ) )
124 if ( next_entry_.size( ) > 0 )
126 output.swap( next_entry_ );
127 next_entry_.clear( );
130 get_next_block( next_entry_ );
145 class iterate_handler_with_simple_gaps :
public detail::iterate_handler
147 typedef std::vector< NDS::channel >::iterator ch_vec_iter;
148 typedef std::vector< NDS::channel >::const_iterator
159 struct indexed_buffers
163 buffer::gps_second_type cur_;
164 buffer::gps_second_type stride_;
166 std::unique_ptr< NDS::detail::delayed_gap_handler > >
170 : bufs_( ), cur_( 0 ), stride_( 0 ), delay_( )
175 initialize(
const std::vector< NDS::channel >& channel_list,
176 buffer::gps_second_type gps_time,
177 buffer::gps_second_type stride )
179 if ( bufs_.size( ) != channel_list.size( ) )
181 bufs_.resize( channel_list.size( ) );
184 for (
int i = 0; i < channel_list.size( ); ++i )
186 const NDS::channel& cur_ch = channel_list[ i ];
187 NDS::buffer& cur_buf = bufs_[ i ];
189 cur_buf.reset_channel_info( cur_ch, gps_time, 0 );
190 if ( cur_ch.Type( ) &
191 NDS::channel::CHANNEL_TYPE_MTREND )
195 static_cast< buffer::size_type >( 60 ) );
199 cur_buf.resize( stride *
200 static_cast< buffer::size_type >(
201 cur_ch.SampleRate( ) ) );
218 if ( bufs_.size( ) == 0 )
222 if ( cur_ >= bufs_.front( ).Start( ) + stride_ )
236 reset( buffers_type& other )
241 if ( bufs_.size( ) > 0 )
243 cur_ = bufs_.front( ).Start( );
244 stride_ = bufs_.front( ).Stop( ) - cur_;
263 buffer::gps_second_type
273 buffer::gps_second_type
276 if ( bufs_.size( ) == 0 )
280 return bufs_.front( ).Start( );
289 buffer::gps_second_type
292 if ( bufs_.size( ) == 0 )
296 return start( ) + stride_ - cur_;
304 advance( buffer::gps_second_type delta )
321 buffer::gps_second_type
322 append_data_from( indexed_buffers& other )
324 if ( other.bufs_.size( ) != bufs_.size( ) )
328 if ( other.cur( ) != cur( ) )
332 buffer::gps_second_type secs_appended =
333 std::min< buffer::gps_second_type >(
334 remaining( ), other.remaining( ) );
336 buffer::gps_second_type src_offset_sec =
337 other.cur( ) - other.start( );
338 buffer::gps_second_type dest_offset_sec = cur( ) - start( );
340 for ( buffers_type::size_type i = 0; i < bufs_.size( );
343 NDS::buffer& src_buf = other.bufs_[ i ];
344 NDS::buffer& dest_buf = bufs_[ i ];
346 auto src =
const_cast< char*
>(
347 reinterpret_cast< const char*
>(
348 src_buf.cbegin<
void >( ) ) );
349 src += src_buf.samples_to_bytes(
350 src_buf.seconds_to_samples( src_offset_sec, 0 ) );
351 auto dest =
const_cast< char*
>(
352 reinterpret_cast< const char*
>(
353 dest_buf.cbegin<
void >( ) ) );
354 dest += dest_buf.samples_to_bytes(
355 dest_buf.seconds_to_samples( dest_offset_sec, 0 ) );
357 buffer::size_type copy_bytes = src_buf.samples_to_bytes(
358 src_buf.seconds_to_samples( secs_appended, 0 ) );
359 std::copy( src, src + copy_bytes, dest );
361 advance( secs_appended );
362 other.advance( secs_appended );
363 return secs_appended;
374 apply_gap_handler( detail::gap_handler& handler,
375 buffer::gps_second_type gps_stop )
377 if ( gps_stop <= cur_ )
381 buffer::gps_second_type start_time = start( );
382 if ( gps_stop > start_time + stride_ )
384 gps_stop = start_time + stride_;
387 for ( buffers_type::iterator cur_buf = bufs_.begin( );
388 cur_buf != bufs_.end( );
391 std::unique_ptr< detail::delayed_gap_handler >
392 delayed_handler( handler.fill_gap(
394 cur_buf->seconds_to_samples( cur_ -
396 cur_buf->seconds_to_samples( gps_stop -
398 if ( delayed_handler )
400 delay_.push_back( std::move( delayed_handler ) );
403 advance( gps_stop - cur_ );
407 apply_delayed_handlers( )
409 for ( std::vector< std::unique_ptr<
410 detail::delayed_gap_handler > >::iterator cur =
412 cur != delay_.end( );
424 buffer::gps_second_type cur_gps_;
425 buffer::gps_second_type gps_start_;
426 buffer::gps_second_type gps_stop_;
427 buffer::gps_second_type stride_;
428 std::unique_ptr< detail::gap_handler > gap_handler_;
429 std::vector< NDS::channel > channel_list_;
431 indexed_buffers current_buffer_;
432 indexed_buffers pending_buffer_;
434 bool is_more_data_available_;
437 iterate_handler_with_simple_gaps(
438 buffer::gps_second_type gps_start,
439 buffer::gps_second_type gps_stop,
440 buffer::gps_second_type stride,
441 const connection::channel_names_type& channel_names,
442 std::shared_ptr< NDS::detail::conn_p_type >&& parent,
443 std::unique_ptr< detail::gap_handler > ghandler )
444 : iterate_handler( std::move( parent ) ), cur_gps_( gps_start ),
445 gps_start_( gps_start ), gps_stop_( gps_stop ),
446 stride_( stride ), gap_handler_( std::move( ghandler ) ),
447 channel_list_( ), current_buffer_( ), pending_buffer_( ),
448 is_more_data_available_( true )
450 channel_list_.reserve( channel_names.size( ) );
451 parent->issue_iterate( gps_start_,
458 stride_ = parent->calculate_stride(
459 gps_start_, gps_stop_, channel_list_ );
461 current_buffer_.initialize( channel_list_, cur_gps_, stride_ );
463 ~iterate_handler_with_simple_gaps( )
override =
default;
468 return conn( ) && ( cur_gps_ < gps_stop_ );
472 next( buffers_type& output )
override
474 auto parent = conn( );
475 if ( !has_next( ) || !parent )
477 throw std::out_of_range(
"No Next" );
480 buffer::gps_second_type segment_end = cur_gps_ + stride_;
481 if ( current_buffer_.cur( ) > segment_end )
483 throw std::runtime_error(
484 "Impossible condition triggered, gap "
485 "handled iterator went beyond "
488 while ( current_buffer_.cur( ) < segment_end )
503 if ( pending_buffer_.valid( ) )
505 if ( current_buffer_.cur( ) == pending_buffer_.cur( ) )
507 current_buffer_.append_data_from( pending_buffer_ );
509 else if ( current_buffer_.cur( ) <
510 pending_buffer_.cur( ) )
512 current_buffer_.apply_gap_handler(
514 std::min( pending_buffer_.cur( ),
518 else if ( is_more_data_available_ )
523 parent->next_raw_buffer( tmp );
524 pending_buffer_.reset( tmp );
525 if ( pending_buffer_.bufs( ).size( ) == 0 )
527 is_more_data_available_ =
false;
532 is_more_data_available_ =
false;
537 current_buffer_.apply_gap_handler( *gap_handler_,
541 current_buffer_.apply_delayed_handlers( );
542 output.swap( current_buffer_.bufs( ) );
544 if ( cur_gps_ + stride_ > gps_stop_ )
546 stride_ = gps_stop_ - cur_gps_;
548 current_buffer_.initialize( channel_list_, cur_gps_, stride_ );
561 class iterate_available_handler :
public detail::iterate_handler
563 detail::request_fragments_type fragment_list_;
564 buffer::size_type cur_segment_;
565 buffer::gps_second_type cur_gps_;
566 buffer::gps_second_type gps_start_;
567 buffer::gps_second_type gps_stop_;
568 buffer::gps_second_type max_stride_;
569 channel::channel_names_type names_;
570 buffers_type next_entry_;
573 setup_next_step( NDS::detail::conn_p_type& parent )
575 NDS::detail::dout( ) <<
"setup_next_iterate_step()"
579 NDS::detail::dout( ) <<
"Finding first segment"
582 cur_gps_ = gps_start_;
585 if ( fragment_list_[ 0 ].time_spans.empty( ) )
590 simple_segment_list_type::value_type cur_segment =
591 ( fragment_list_[ 0 ].time_spans )[ cur_segment_ ];
593 NDS::detail::dout( ) <<
"status " << cur_gps_ <<
" "
594 << cur_segment.gps_start <<
"-"
595 << cur_segment.gps_stop << std::endl;
596 if ( cur_gps_ <= cur_segment.gps_start )
598 NDS::detail::dout( ) <<
"Starting a segment " << std::endl;
599 buffer::gps_second_type delta =
600 cur_segment.gps_stop - cur_segment.gps_start;
601 buffer::gps_second_type stride =
602 ( max_stride_ > delta ? delta : max_stride_ );
603 NDS::detail::dout( ) << cur_segment.gps_start <<
" "
604 << cur_segment.gps_stop <<
" "
605 << stride << std::endl;
606 parent.issue_iterate( cur_segment.gps_start,
607 cur_segment.gps_stop,
611 else if ( cur_gps_ == cur_segment.gps_stop )
613 NDS::detail::dout( ) <<
"Ending a segment " << std::endl;
616 fragment_list_[ 0 ].time_spans.size( ) )
620 simple_segment_list_type::value_type new_segment =
621 ( fragment_list_[ 0 ].time_spans )[ cur_segment_ ];
622 cur_gps_ = new_segment.gps_start;
623 buffer::gps_second_type delta =
624 new_segment.gps_stop - new_segment.gps_start;
625 buffer::gps_second_type stride =
626 ( max_stride_ > delta ? delta : max_stride_ );
627 NDS::detail::dout( ) << new_segment.gps_start <<
" "
628 << new_segment.gps_stop <<
" "
630 parent.issue_iterate( new_segment.gps_start,
631 new_segment.gps_stop,
637 NDS::detail::dout( ) <<
"Mid segment" << std::endl;
642 iterate_available_handler(
643 buffer::gps_second_type gps_start,
644 buffer::gps_second_type gps_stop,
645 buffer::gps_second_type stride,
646 const channel::channel_names_type& channel_names,
647 std::shared_ptr< NDS::detail::conn_p_type >&& parent )
648 : iterate_handler( std::move( parent ) ), fragment_list_( ),
649 cur_segment_( 0 ), cur_gps_( 0 ), gps_start_( gps_start ),
650 gps_stop_( gps_stop ), max_stride_( stride ),
651 names_( channel_names )
653 if ( parent->protocol == connection::PROTOCOL_ONE ||
656 NDS::detail::dout( ) <<
"Fast path" << std::endl;
658 parent->issue_iterate(
659 gps_start, gps_stop, stride, channel_names );
662 NDS::detail::dout( ) <<
"Planning fetches" << std::endl;
665 retval.resize( channel_names.size( ) );
666 parent->plan_fetches(
667 gps_start_, gps_stop_, names_, retval, fragment_list_ );
670 if ( fragment_list_.size( ) != 1 )
672 throw connection::daq_error(
674 "The requested channels have "
675 "different/non-identical gaps." );
678 setup_next_step( *( parent.get( ) ) );
681 ~iterate_available_handler( )
override =
default;
686 if ( next_entry_.size( ) > 0 )
690 auto parent = conn( );
691 if ( cur_gps_ >= gps_stop_ || !parent )
697 parent->next_raw_buffer( next_entry_ );
698 if ( next_entry_.size( ) == 0 )
702 cur_gps_ = next_entry_[ 0 ].Stop( );
703 setup_next_step( *parent );
713 next( buffers_type& output )
override
717 if ( next_entry_.size( ) > 0 )
719 output.swap( next_entry_ );
720 next_entry_.clear( );
723 auto parent = conn( );
726 throw( std::out_of_range(
"No Next" ) );
728 parent->next_raw_buffer( retval );
729 if ( retval.empty( ) )
731 throw( std::out_of_range(
"No Next" ) );
735 cur_gps_ = retval[ 0 ].Stop( );
736 setup_next_step( *parent );
738 output.swap( retval );
754 class iterate_full_handler :
public detail::iterate_handler
756 buffer::gps_second_type gps_start_;
757 buffer::gps_second_type gps_stop_;
758 buffer::gps_second_type gps_stride_;
759 buffer::gps_second_type cur_gps_;
760 channel::channel_names_type names_;
761 channels_type channels_;
765 iterate_full_handler(
766 buffer::gps_second_type gps_start,
767 buffer::gps_second_type gps_stop,
768 buffer::gps_second_type stride,
769 const channel::channel_names_type& channel_names,
771 const channels_type& channel_list,
772 std::shared_ptr< NDS::detail::conn_p_type >&& parent )
773 : iterate_handler( std::move( parent ) ),
774 gps_start_( gps_start ), gps_stop_( gps_stop ),
775 gps_stride_( stride ), cur_gps_( gps_start ),
776 names_( channel_names ), prev_epoch_( prev_epoch ),
777 channels_( channel_list ){
781 ~iterate_full_handler( )
override =
default;
786 cur_gps_ += gps_stride_;
791 return ( gps_stride_ == 0 || cur_gps_ >= gps_stop_ );
801 if ( gps_start_ == 0 && gps_stop_ == 0 )
809 next( buffers_type& output )
override
813 NDS::detail::dout( ) <<
"next_full" << std::endl;
815 auto parent = conn( );
818 throw std::out_of_range(
"No next buffer" );
821 if ( gps_start_ == 0 )
823 NDS::detail::dout( ) <<
"fast path" << std::endl;
824 parent->next_raw_buffer( retval );
829 <<
"Disabling request in progress in order to fetch"
831 parent->request_in_progress(
false );
833 buffer::gps_second_type start = cur_gps_;
834 buffer::gps_second_type end = gps_stop_;
835 buffer::gps_second_type stride = gps_stride_;
836 buffer::gps_second_type stop = start + stride;
841 stride = stop - start;
844 NDS::detail::dout( ) <<
"Issuing fetch( " << start <<
", "
845 << stop <<
" ... ) " << std::endl;
847 parent->fetch( start, stop, names_, &( channels_ ) );
852 NDS::detail::dout( ) <<
"iterate_full is complete"
854 parent->set_epoch( prev_epoch_.gps_start,
855 prev_epoch_.gps_stop );
859 NDS::detail::dout( ) <<
"There is more to iterate, "
860 "request_in_progres enabled"
862 parent->request_in_progress(
true );
865 output.swap( retval );
871 #endif // NDS2_CLIENT_NDS_ITERATE_HANDLERS_HH