nds2-client - ClientDeveloper  0.16.8
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
buffered_reader.hh
Go to the documentation of this file.
1 //
2 // Created by Jonathan Hanks on 4/15/17.
3 //
4 
5 #ifndef NDS_PROXY_BUFFERED_READER_HH
6 #define NDS_PROXY_BUFFERED_READER_HH
7 
8 #include <algorithm>
9 #include <stdexcept>
10 #include <vector>
11 
12 namespace nds_impl
13 {
14  namespace Socket
15  {
16  template < typename Reader >
18  {
19  public:
20  explicit BufferedReader( Reader& r ) : r_( r ), buf_{}
21  {
22  }
23  // BufferedReader(Reader&& r): r_{std::move(r)}, buf_{} {}
24  // BufferedReader(Reader&& r): r_{std::move(r)} {}
25 
26  // BufferedReader(BufferedReader<Reader, ReaderCaptureType>&&
27  // other): r_{std::move(other.r_)}, buf_{std::move(other.buf)} {}
28 
34  void
35  write_all( const char* start, const char* end )
36  {
37  r_.write_all( start, end );
38  }
39 
48  char*
49  read_available( char* start, char* end )
50  {
51  size_t len_requested = end - start;
52 
53  if ( buf_.empty( ) )
54  fill_buffer( );
55  if ( buf_.size( ) < len_requested )
56  len_requested = buf_.size( );
57  std::copy( buf_.data( ), buf_.data( ) + len_requested, start );
58  consume( len_requested );
59  return start + len_requested;
60  }
61 
68  char*
69  read_exactly( char* start, const char* end )
70  {
71  size_t len_requested = end - start;
72 
73  auto result = peek( start, end );
74  consume( len_requested );
75  return result;
76  }
77 
88  char*
89  peek( char* start, const char* end )
90  {
91  size_t len_requested = end - start;
92 
93  while ( buf_.size( ) < len_requested )
94  fill_buffer( );
95  std::copy( buf_.data( ), buf_.data( ) + len_requested, start );
96  return start + len_requested;
97  }
98 
110  template < typename It >
111  std::vector< char >
112  read_until( It begin_set, It end_set )
113  {
114  std::vector< char > result;
115 
116  decltype( buf_.size( ) ) cur = 0;
117  auto remaining = buf_.size( );
118  for ( bool match = false; !match; ++cur, --remaining )
119  {
120  if ( remaining == 0 )
121  {
122  fill_buffer( );
123  remaining = buf_.size( ) - cur;
124  }
125 
126  if ( std::find( begin_set, end_set, buf_.at( cur ) ) !=
127  end_set )
128  {
129  match = true;
130  }
131  }
132  result.resize( cur );
133  std::copy(
134  buf_.begin( ), buf_.begin( ) + cur, result.begin( ) );
135  consume( cur );
136  return result;
137  }
138 
151  template < typename It, typename OutIt >
152  OutIt
153  read_until( It begin_set,
154  It end_set,
155  OutIt dest_start,
156  OutIt dest_end )
157  {
158  decltype( buf_.size( ) ) cur = 0;
159  auto remaining = buf_.size( );
160  OutIt dest_cur = dest_start;
161  bool match = false;
162  for ( ; !match && dest_cur != dest_end;
163  ++cur, --remaining, ++dest_cur )
164  {
165  if ( remaining == 0 )
166  {
167  fill_buffer( );
168  remaining = buf_.size( ) - cur;
169  }
170 
171  if ( std::find( begin_set, end_set, buf_.at( cur ) ) !=
172  end_set )
173  {
174  match = true;
175  }
176  }
177  if ( match )
178  {
179  std::copy( buf_.begin( ), buf_.begin( ) + cur, dest_start );
180  }
181  else
182  {
183  throw std::range_error(
184  "Unable to find matching sequence" );
185  }
186  consume( cur );
187  return dest_cur;
188  }
189 
194  std::vector< char >::size_type
196  {
197  return buf_.size( );
198  }
199 
200  private:
201  Reader& r_;
202  std::vector< char > buf_;
203 
204  void
206  {
207  const decltype( buf_.size( ) ) chunk_size = 1024;
208  auto data_size = buf_.size( );
209 
210  buf_.resize( data_size + chunk_size );
211  auto data_start = buf_.data( ) + data_size;
212 
213  auto result =
214  r_.read_available( data_start, data_start + chunk_size );
215 
216  auto data_read_in =
217  static_cast< size_t >( result - data_start );
218  if ( data_read_in > 0 )
219  {
220  buf_.resize( data_size + data_read_in );
221  }
222  else
223  {
224  buf_.resize( data_size );
225  throw std::runtime_error( "io operation failed" );
226  }
227  }
228 
229  void
230  consume( std::vector< char >::size_type count )
231  {
232  if ( count <= 0 )
233  return;
234  if ( count >= buf_.size( ) )
235  {
236  buf_.clear( );
237  return;
238  }
239  auto current_size = buf_.size( );
240  auto new_head = buf_.data( ) + count;
241 
242  std::move(
243  new_head, buf_.data( ) + current_size, buf_.data( ) );
244  buf_.resize( current_size - count );
245  }
246  };
247 
248  template < typename Reader >
250  {
251  Reader r_;
252  std::vector< char > buf_;
253 
254  void
256  {
257  const decltype( buf_.size( ) ) chunk_size = 1024;
258  auto data_size = buf_.size( );
259 
260  buf_.resize( data_size + chunk_size );
261  auto data_start = buf_.data( ) + data_size;
262 
263  auto result =
264  r_.read_available( data_start, data_start + chunk_size );
265 
266  auto data_read_in =
267  static_cast< size_t >( result - data_start );
268  if ( data_read_in > 0 )
269  {
270  buf_.resize( data_size + data_read_in );
271  }
272  else
273  {
274  buf_.resize( data_size );
275  throw std::runtime_error( "io operation failed" );
276  }
277  }
278 
279  void
280  consume( std::vector< char >::size_type count )
281  {
282  if ( count <= 0 )
283  return;
284  if ( count >= buf_.size( ) )
285  {
286  buf_.clear( );
287  return;
288  }
289  auto current_size = buf_.size( );
290  auto new_head = buf_.data( ) + count;
291 
292  std::move(
293  new_head, buf_.data( ) + current_size, buf_.data( ) );
294  buf_.resize( current_size - count );
295  }
296 
297  public:
298  explicit OwningBufferedReader( Reader&& r )
299  : r_{ std::move( r ) }, buf_{}
300  {
301  }
302  // BufferedReader(Reader&& r): r_{std::move(r)}, buf_{} {}
303  // BufferedReader(Reader&& r): r_{std::move(r)} {}
304 
305  // BufferedReader(BufferedReader<Reader, ReaderCaptureType>&&
306  // other): r_{std::move(other.r_)}, buf_{std::move(other.buf)} {}
307 
313  void
314  write_all( const char* start, const char* end )
315  {
316  r_.write_all( start, end );
317  }
318 
327  char*
328  read_available( char* start, char* end )
329  {
330  size_t len_requested = end - start;
331 
332  if ( buf_.empty( ) )
333  fill_buffer( );
334  if ( buf_.size( ) < len_requested )
335  len_requested = buf_.size( );
336  std::copy( buf_.data( ), buf_.data( ) + len_requested, start );
337  consume( len_requested );
338  return start + len_requested;
339  }
340 
347  char*
348  read_exactly( char* start, const char* end )
349  {
350  size_t len_requested = end - start;
351 
352  while ( buf_.size( ) < len_requested )
353  fill_buffer( );
354  std::copy( buf_.data( ), buf_.data( ) + len_requested, start );
355  consume( len_requested );
356  return start + len_requested;
357  }
358 
370  template < typename It >
371  std::vector< char >
372  read_until( It begin_set, It end_set )
373  {
374  std::vector< char > result;
375 
376  decltype( buf_.size( ) ) cur = 0;
377  auto remaining = buf_.size( );
378  for ( bool match = false; !match; ++cur, --remaining )
379  {
380  if ( remaining == 0 )
381  {
382  fill_buffer( );
383  remaining = buf_.size( ) - cur;
384  }
385 
386  if ( std::find( begin_set, end_set, buf_.at( cur ) ) !=
387  end_set )
388  {
389  match = true;
390  }
391  }
392  result.resize( cur );
393  std::copy(
394  buf_.begin( ), buf_.begin( ) + cur, result.begin( ) );
395  consume( cur );
396  return result;
397  }
398 
403  std::vector< char >::size_type
405  {
406  return buf_.size( );
407  }
408  };
409  }
410 }
411 
413 // Tests only after this point.
415 
416 #ifdef _NDS_IMPL_ENABLE_CATCH_TESTS_
417 
418 #include <algorithm>
419 #include <iterator>
420 #include <sstream>
421 
422 #include "socket/socket.hh"
423 #include "tests/dummy_socket.hh"
424 #include "catch.hpp"
425 
426 TEST_CASE( "Create a buffered socket", "[buffered,create]" )
427 {
430 }
431 
432 TEST_CASE( "You can write through a buffered reader", "[buffered,write]" )
433 {
434  using namespace nds_testing;
435 
438 
439  auto hello = std::string( "hello" );
440  b.write_all( hello.data( ), hello.data( ) + hello.length( ) );
441  REQUIRE( s.str( ) == hello );
442 
443  auto world = std::string( " world!" );
444  b.write_all( world.data( ), world.data( ) + world.length( ) );
445  REQUIRE( s.str( ) == hello + world );
446 }
447 
448 TEST_CASE( "You can read from a buffered writter", "[buffered,read]" )
449 {
450  using namespace nds_testing;
451 
452  std::vector< char > dest( 20 );
453 
454  auto hello = std::string{ "hello world!" };
455  auto s = DummySocket( hello );
456 
458 
459  REQUIRE( b.__buffered_bytes( ) == 0 );
460 
461  auto end = b.read_available( dest.data( ), dest.data( ) + dest.size( ) );
462  REQUIRE( b.__buffered_bytes( ) == 0 );
463  REQUIRE( end != dest.data( ) );
464  REQUIRE( end == dest.data( ) + hello.size( ) );
465  auto output = std::string( dest.data( ), end );
466  REQUIRE( output == hello );
467 }
468 
469 TEST_CASE( "You can read the data in small segments from a buffered reader",
470  "[buffered,read]" )
471 {
472  using namespace nds_testing;
473 
474  std::vector< char > dest( 5 );
475 
476  auto hello = std::string{ "hello world!" };
477  auto s = DummySocket( hello );
478 
480 
481  REQUIRE( b.__buffered_bytes( ) == 0 );
482 
483  auto end = b.read_available( dest.data( ), dest.data( ) + dest.size( ) );
484  REQUIRE( b.__buffered_bytes( ) == ( hello.size( ) - dest.size( ) ) );
485  REQUIRE( end != dest.data( ) );
486  REQUIRE( end == dest.data( ) + dest.size( ) );
487  auto output = std::string( dest.data( ), end );
488  REQUIRE( output == hello.substr( 0, dest.size( ) ) );
489 
490  end = b.read_available( dest.data( ), dest.data( ) + dest.size( ) );
491  REQUIRE( b.__buffered_bytes( ) == ( hello.size( ) - 2 * dest.size( ) ) );
492  REQUIRE( end != dest.data( ) );
493  REQUIRE( end == dest.data( ) + dest.size( ) );
494  output = std::string( dest.data( ), end );
495  REQUIRE( output == hello.substr( dest.size( ), dest.size( ) ) );
496 }
497 
498 TEST_CASE( "You can ask for an exact amount of bytes to be returned",
499  "[buffered,read]" )
500 {
501  using namespace nds_testing;
502 
503  std::vector< char > dest( 10 );
504 
505  auto input = std::string{ "0123456789" };
506  auto s = DummySocket( input );
508 
509  auto stride = 4;
510  auto end = b.read_exactly( dest.data( ), dest.data( ) + stride );
511  REQUIRE( b.__buffered_bytes( ) == ( input.size( ) - stride ) );
512  REQUIRE( end == dest.data( ) + stride );
513 
514  REQUIRE_THROWS(
515  b.read_exactly( dest.data( ), dest.data( ) + dest.size( ) ) );
516 }
517 
518 TEST_CASE( "You can peek at an exact amount of data without consuming it",
519  "[buffered,read]" )
520 {
521  using namespace nds_testing;
522 
523  std::vector< char > dest( 10 );
524  auto input = std::string{ "0123456789" };
525 
526  auto s = DummySocket( input );
528 
529  auto stride = 4;
530  auto end = b.peek( dest.data( ), dest.data( ) + stride );
531  REQUIRE( b.__buffered_bytes( ) == input.size( ) );
532  REQUIRE( end == dest.data( ) + stride );
533  REQUIRE( std::equal( dest.data( ), dest.data( ) + stride, input.data( ) ) );
534 
535  // clear the buffer
536  std::fill( dest.begin( ), dest.end( ), 0 );
537  REQUIRE(
538  !std::equal( dest.data( ), dest.data( ) + stride, input.data( ) ) );
539 
540  end = b.read_exactly( dest.data( ), dest.data( ) + stride );
541  REQUIRE( b.__buffered_bytes( ) == input.size( ) - stride );
542  REQUIRE( end == dest.data( ) + stride );
543  REQUIRE( std::equal( dest.data( ), dest.data( ) + stride, input.data( ) ) );
544 }
545 
546 TEST_CASE( "You can ask a buffered read to read until a sequence is found",
547  "[buffered,read_until]" )
548 {
549  using namespace nds_testing;
550 
551  auto input = std::string{ "0123456789" };
552  auto s = DummySocket( input );
554 
555  auto terminators = std::string( "59" );
556  auto result = b.read_until( terminators.begin( ), terminators.end( ) );
557  REQUIRE( std::string( result.begin( ), result.end( ) ) ==
558  std::string( "012345" ) );
559 }
560 
561 TEST_CASE( "You can ask a buffered read to read until a sequence is found, "
562  "with a bound",
563  "[buffered,read_until]" )
564 {
565  using namespace nds_testing;
566 
567  std::vector< char > dest( 5 );
568  auto input = std::string{ "0123456789abcdef" };
569  auto s = DummySocket( input );
571 
572  std::fill( dest.begin( ), dest.end( ), 0 );
573  auto terminators = std::string( "29" );
574  auto end = b.read_until(
575  terminators.begin( ), terminators.end( ), dest.begin( ), dest.end( ) );
576  auto length = std::distance( dest.begin( ), end );
577  REQUIRE( length == 3 );
578  REQUIRE( std::string( dest.data( ), length ) == std::string( "012" ) );
579 }
580 
581 TEST_CASE( "A bounded buffered read_until will throw an exception if it cannot "
582  "find a sequence",
583  "[buffered,read_until]" )
584 {
585  using namespace nds_testing;
586 
587  std::vector< char > dest( 5 );
588  auto input = std::string{ "0123456789abcdef" };
589  auto s = DummySocket( input );
591 
592  auto terminators = std::string( "Z" );
593  REQUIRE_THROWS_AS( b.read_until( terminators.begin( ),
594  terminators.end( ),
595  dest.begin( ),
596  dest.end( ) ),
597  std::range_error );
598 }
599 
600 TEST_CASE( "A bounded buffered read_until will throw an exception if it cannot "
601  "find a sequence when input is empty",
602  "[buffered,read_until]" )
603 {
604  using namespace nds_testing;
605 
606  std::vector< char > dest( 5 );
607  auto input = std::string{ "" };
608  auto s = DummySocket( input );
610 
611  auto terminators = std::string( "Z" );
612  REQUIRE_THROWS_AS( b.read_until( terminators.begin( ),
613  terminators.end( ),
614  dest.begin( ),
615  dest.end( ) ),
616  std::runtime_error );
617 }
618 
619 TEST_CASE( "Create an owning buffered socket", "[buffered,create]" )
620 {
623  std::move( s1 )
624  };
625 }
626 
627 TEST_CASE( "You can write through a owning buffered reader",
628  "[buffered,write]" )
629 {
630  using namespace nds_testing;
631 
632  std::vector< char > buf;
633  RecordingDummySocket s( buf );
635  s ) };
636 
637  auto hello = std::string( "hello" );
638  b.write_all( hello.data( ), hello.data( ) + hello.length( ) );
639  REQUIRE( std::string( buf.data( ), buf.size( ) ) == hello );
640 
641  auto world = std::string( " world!" );
642  b.write_all( world.data( ), world.data( ) + world.length( ) );
643  REQUIRE( std::string( buf.data( ), buf.size( ) ) == hello + world );
644 }
645 
646 TEST_CASE( "You can read from a owning buffered writter", "[buffered,read]" )
647 {
648  using namespace nds_testing;
649 
650  std::vector< char > dest( 20 );
651 
652  auto hello = std::string{ "hello world!" };
653  auto s = DummySocket( hello );
654 
656 
657  REQUIRE( b.__buffered_bytes( ) == 0 );
658 
659  auto end = b.read_available( dest.data( ), dest.data( ) + dest.size( ) );
660  REQUIRE( b.__buffered_bytes( ) == 0 );
661  REQUIRE( end != dest.data( ) );
662  REQUIRE( end == dest.data( ) + hello.size( ) );
663  auto output = std::string( dest.data( ), end );
664  REQUIRE( output == hello );
665 }
666 
667 TEST_CASE(
668  "You can read the data in small segments from an owning buffered reader",
669  "[buffered,read]" )
670 {
671  using namespace nds_testing;
672 
673  std::vector< char > dest( 5 );
674 
675  auto hello = std::string{ "hello world!" };
676  auto s = DummySocket( hello );
677 
679 
680  REQUIRE( b.__buffered_bytes( ) == 0 );
681 
682  auto end = b.read_available( dest.data( ), dest.data( ) + dest.size( ) );
683  REQUIRE( b.__buffered_bytes( ) == ( hello.size( ) - dest.size( ) ) );
684  REQUIRE( end != dest.data( ) );
685  REQUIRE( end == dest.data( ) + dest.size( ) );
686  auto output = std::string( dest.data( ), end );
687  REQUIRE( output == hello.substr( 0, dest.size( ) ) );
688 
689  end = b.read_available( dest.data( ), dest.data( ) + dest.size( ) );
690  REQUIRE( b.__buffered_bytes( ) == ( hello.size( ) - 2 * dest.size( ) ) );
691  REQUIRE( end != dest.data( ) );
692  REQUIRE( end == dest.data( ) + dest.size( ) );
693  output = std::string( dest.data( ), end );
694  REQUIRE( output == hello.substr( dest.size( ), dest.size( ) ) );
695 }
696 
697 TEST_CASE( "You can ask for an exact amount of bytes to be returned from an "
698  "owning buffered reader",
699  "[buffered,read]" )
700 {
701  using namespace nds_testing;
702 
703  std::vector< char > dest( 10 );
704 
705  auto input = std::string{ "0123456789" };
706  auto s = DummySocket( input );
708 
709  auto stride = 4;
710  auto end = b.read_exactly( dest.data( ), dest.data( ) + stride );
711  REQUIRE( b.__buffered_bytes( ) == ( input.size( ) - stride ) );
712  REQUIRE( end == dest.data( ) + stride );
713 
714  REQUIRE_THROWS(
715  b.read_exactly( dest.data( ), dest.data( ) + dest.size( ) ) );
716 }
717 
718 TEST_CASE(
719  "You can ask an owning buffered read to read until a sequence is found",
720  "[buffered,read_until]" )
721 {
722  using namespace nds_testing;
723 
724  std::vector< char > dest( 10 );
725 
726  auto input = std::string{ "0123456789" };
727  auto s = DummySocket( input );
729 
730  auto terminators = std::string( "59" );
731  auto result = b.read_until( terminators.begin( ), terminators.end( ) );
732  REQUIRE( std::string( result.begin( ), result.end( ) ) ==
733  std::string( "012345" ) );
734 }
735 
736 #endif // _NDS_IMPL_ENABLE_CATCH_TESTS_
737 
738 #endif // NDS_PROXY_BUFFERED_READER_HH
BufferedReader(Reader &r)
Definition: buffered_reader.hh:20
Definition: socket.hh:108
void write_all(const char *start, const char *end)
Definition: buffered_reader.hh:314
void fill_buffer()
Definition: buffered_reader.hh:255
void consume(std::vector< char >::size_type count)
Definition: buffered_reader.hh:280
void write_all(const char *start, const char *end)
Definition: buffered_reader.hh:35
std::vector< char >::size_type __buffered_bytes() const
Definition: buffered_reader.hh:404
Reader r_
Definition: buffered_reader.hh:251
OwningBufferedReader(Reader &&r)
Definition: buffered_reader.hh:298
TEST_CASE("daq_strlcpy copies strings safely when buffers are sufficiently large")
Definition: test_bsd_string.cc:9
OutIt read_until(It begin_set, It end_set, OutIt dest_start, OutIt dest_end)
Definition: buffered_reader.hh:153
Reader & r_
Definition: buffered_reader.hh:201
Definition: dummy_socket.hh:45
std::vector< char > read_until(It begin_set, It end_set)
Definition: buffered_reader.hh:112
void consume(std::vector< char >::size_type count)
Definition: buffered_reader.hh:230
std::vector< char > buf_
Definition: buffered_reader.hh:252
char * peek(char *start, const char *end)
Definition: buffered_reader.hh:89
std::vector< char > read_until(It begin_set, It end_set)
Definition: buffered_reader.hh:372
void fill_buffer()
Definition: buffered_reader.hh:205
char * read_exactly(char *start, const char *end)
Definition: buffered_reader.hh:69
std::vector< char >::size_type __buffered_bytes() const
Definition: buffered_reader.hh:195
char * read_available(char *start, char *end)
Definition: buffered_reader.hh:49
char * read_exactly(char *start, const char *end)
Definition: buffered_reader.hh:348
Definition: buffered_reader.hh:249
std::vector< char > buf_
Definition: buffered_reader.hh:202
char * read_available(char *start, char *end)
Definition: buffered_reader.hh:328
Definition: buffered_reader.hh:17
Definition: dummy_socket.hh:14