From 2bcf019773fdea02933c1c344d76aa53cbcb81f1 Mon Sep 17 00:00:00 2001 From: Sipwise Jenkins Builder Date: Tue, 7 Jun 2022 11:41:25 +0200 Subject: [PATCH 01/12] Release new version 10.5.1.0+0~mr10.5.1.0 --- debian/changelog | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/debian/changelog b/debian/changelog index 9f2fab60..3e7e730b 100644 --- a/debian/changelog +++ b/debian/changelog @@ -1,3 +1,9 @@ +ngcp-bulk-processor (10.5.1.0+0~mr10.5.1.0) unstable; urgency=medium + + * New release. + + -- Sipwise Jenkins Builder Tue, 07 Jun 2022 11:41:25 +0200 + ngcp-bulk-processor (10.5.0.0+0~mr10.5.0.0) unstable; urgency=medium [ Rene Krenn ] From 26d32de14791a991b2e0bb1aae509da4ade4e3a6 Mon Sep 17 00:00:00 2001 From: Sipwise Jenkins Builder Date: Wed, 29 Jun 2022 17:25:54 +0200 Subject: [PATCH 02/12] Release new version 10.5.2.0+0~mr10.5.2.0 --- debian/changelog | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/debian/changelog b/debian/changelog index 3e7e730b..98040090 100644 --- a/debian/changelog +++ b/debian/changelog @@ -1,3 +1,9 @@ +ngcp-bulk-processor (10.5.2.0+0~mr10.5.2.0) unstable; urgency=medium + + * New release. + + -- Sipwise Jenkins Builder Wed, 29 Jun 2022 17:25:54 +0200 + ngcp-bulk-processor (10.5.1.0+0~mr10.5.1.0) unstable; urgency=medium * New release. From 1c3478d7885877545a9d840909a519e1b221a627 Mon Sep 17 00:00:00 2001 From: Rene Krenn Date: Tue, 1 Sep 2020 18:17:01 +0200 Subject: [PATCH 03/12] TT#92350 TT#175101 LNP database bulk loader this tool provides loading and updating the ngcp LNP database from customer-specific textfiles with many GBytes and tens of mio. of rows. in a *fast* way. Change-Id: Ie64b1a9c7dfe3251c41dd443ed937cf8da18cb36 (cherry picked from commit d5692307222c5a02aafc4f2142cdc709f79096a3) --- .../Dao/Trunk/billing/lnp_numbers.pm | 59 +++- .../Dao/Trunk/billing/lnp_providers.pm | 114 +++++- lib/NGCP/BulkProcessor/FileProcessor.pm | 16 +- .../BulkProcessor/Projects/ETL/Lnp/Dao/lnp.pm | 326 ++++++++++++++++++ .../ETL/Lnp/FileProcessors/NumbersFile.pm | 66 ++++ .../BulkProcessor/Projects/ETL/Lnp/Import.pm | 190 ++++++++++ .../Projects/ETL/Lnp/ProcessLnp.pm | 322 +++++++++++++++++ .../Projects/ETL/Lnp/ProjectConnectorPool.pm | 92 +++++ .../Projects/ETL/Lnp/Settings.pm | 218 ++++++++++++ .../BulkProcessor/Projects/ETL/Lnp/config.cfg | 62 ++++ .../BulkProcessor/Projects/ETL/Lnp/process.pl | 291 ++++++++++++++++ 11 files changed, 1752 insertions(+), 4 deletions(-) create mode 100644 lib/NGCP/BulkProcessor/Projects/ETL/Lnp/Dao/lnp.pm create mode 100644 lib/NGCP/BulkProcessor/Projects/ETL/Lnp/FileProcessors/NumbersFile.pm create mode 100644 lib/NGCP/BulkProcessor/Projects/ETL/Lnp/Import.pm create mode 100644 lib/NGCP/BulkProcessor/Projects/ETL/Lnp/ProcessLnp.pm create mode 100644 lib/NGCP/BulkProcessor/Projects/ETL/Lnp/ProjectConnectorPool.pm create mode 100644 lib/NGCP/BulkProcessor/Projects/ETL/Lnp/Settings.pm create mode 100644 lib/NGCP/BulkProcessor/Projects/ETL/Lnp/config.cfg create mode 100644 lib/NGCP/BulkProcessor/Projects/ETL/Lnp/process.pl diff --git a/lib/NGCP/BulkProcessor/Dao/Trunk/billing/lnp_numbers.pm b/lib/NGCP/BulkProcessor/Dao/Trunk/billing/lnp_numbers.pm index 3b70ebff..c07d7a16 100644 --- a/lib/NGCP/BulkProcessor/Dao/Trunk/billing/lnp_numbers.pm +++ b/lib/NGCP/BulkProcessor/Dao/Trunk/billing/lnp_numbers.pm @@ -6,6 +6,7 @@ use strict; use NGCP::BulkProcessor::Logging qw( getlogger rowinserted + rowsdeleted ); use NGCP::BulkProcessor::ConnectorPool qw( @@ -18,6 +19,7 @@ use NGCP::BulkProcessor::SqlProcessor qw( update_record delete_record copy_row + insert_stmt ); use NGCP::BulkProcessor::SqlRecord qw(); @@ -29,21 +31,31 @@ our @EXPORT_OK = qw( insert_row update_row delete_row + + delete_numbers + + getinsertstatement findby_lnpproviderid_number countby_lnpproviderid_number + + @fieldnames ); my $tablename = 'lnp_numbers'; my $get_db = \&get_billing_db; -my $expected_fieldnames = [ +our @fieldnames = ( 'id', 'number', 'routing_number', 'lnp_provider_id', 'start', 'end', +); + +my $expected_fieldnames = [ + @fieldnames ]; my $indexes = {}; @@ -130,6 +142,43 @@ sub delete_row { } +sub delete_numbers { + + my ($xa_db,$numbers) = @_; + + check_table(); + my $db = &$get_db(); + $xa_db //= $db; + my $table = $db->tableidentifier($tablename); + + my $stmt = ''; + my @params = (); + if (defined $numbers and 'HASH' eq ref $numbers) { + foreach my $in (keys %$numbers) { + my @values = (defined $numbers->{$in} and 'ARRAY' eq ref $numbers->{$in} ? @{$numbers->{$in}} : ($numbers->{$in})); + $stmt .= ' AND ' if length($stmt); + $stmt .= $db->columnidentifier('number') . ' ' . $in . ' (' . substr(',?' x scalar @values,1) . ')'; + push(@params,@values); + } + } elsif (defined $numbers and length($numbers) > 0) { + $stmt = $db->columnidentifier('number') . ' = ?'; + push(@params,$numbers); + } + + $stmt = ' WHERE ' . $stmt if length($stmt); + $stmt = 'DELETE FROM ' . $table . $stmt; + + my $count; + if ($count = $xa_db->db_do($stmt,@params)) { + rowsdeleted($db,$tablename,$count,$count,getlogger(__PACKAGE__)); + return 1; + } else { + rowsdeleted($db,$tablename,0,0,getlogger(__PACKAGE__)); + return 0; + } + +} + sub insert_row { my $db = &$get_db(); @@ -185,6 +234,14 @@ sub buildrecords_fromrows { } +sub getinsertstatement { + + my ($insert_ignore) = @_; + check_table(); + return insert_stmt($get_db,__PACKAGE__,$insert_ignore); + +} + sub gettablename { return $tablename; diff --git a/lib/NGCP/BulkProcessor/Dao/Trunk/billing/lnp_providers.pm b/lib/NGCP/BulkProcessor/Dao/Trunk/billing/lnp_providers.pm index 4f2c7307..f438b82b 100644 --- a/lib/NGCP/BulkProcessor/Dao/Trunk/billing/lnp_providers.pm +++ b/lib/NGCP/BulkProcessor/Dao/Trunk/billing/lnp_providers.pm @@ -5,6 +5,8 @@ use strict; use NGCP::BulkProcessor::Logging qw( getlogger + rowinserted + rowsdeleted ); use NGCP::BulkProcessor::ConnectorPool qw( @@ -14,6 +16,9 @@ use NGCP::BulkProcessor::ConnectorPool qw( use NGCP::BulkProcessor::SqlProcessor qw( checktableinfo copy_row + insert_record + update_record + delete_record ); use NGCP::BulkProcessor::SqlRecord qw(); @@ -22,8 +27,15 @@ our @ISA = qw(Exporter NGCP::BulkProcessor::SqlRecord); our @EXPORT_OK = qw( gettablename check_table - + + insert_row + update_row + delete_row + findby_prefix + findby_nameprefixauthoritativeskiprewrite + + countby_name ); my $tablename = 'lnp_providers'; @@ -77,6 +89,106 @@ sub findby_prefix { } +sub findby_nameprefixauthoritativeskiprewrite { + + my ($name,$prefix,$authoritative,$skip_rewrite,$load_recursive) = @_; + + check_table(); + my $db = &$get_db(); + my $table = $db->tableidentifier($tablename); + + my $stmt = 'SELECT * FROM ' . $table; + my @params = (); + my @terms = (); + push(@terms,$db->columnidentifier('name') . ' = ?'); + push(@params,$name); + push(@terms,$db->columnidentifier('prefix') . ' = ?'); + push(@params,$prefix); + push(@terms,$db->columnidentifier('authoritative') . ' = ?'); + push(@params,$authoritative); + push(@terms,$db->columnidentifier('skip_rewrite') . ' = ?'); + push(@params,$skip_rewrite); + if ((scalar @terms) > 0) { + $stmt .= ' WHERE ' . join(' AND ',@terms); + } + my $rows = $db->db_get_all_arrayref($stmt,@params); + + return buildrecords_fromrows($rows,$load_recursive); + +} + +sub countby_name { + + my ($name,$load_recursive) = @_; + + check_table(); + my $db = &$get_db(); + my $table = $db->tableidentifier($tablename); + + my $stmt = 'SELECT COUNT(*) FROM ' . $table; + my @terms = (); + my @params = (); + if (defined $name) { + push(@terms,$db->columnidentifier('name') . ' = ?'); + push(@params,$name); + } + if ((scalar @terms) > 0) { + $stmt .= ' WHERE ' . join(' AND ',@terms); + } + + return $db->db_get_value($stmt,@params); + +} + +sub update_row { + + my ($xa_db,$data) = @_; + + check_table(); + return update_record($get_db,$xa_db,__PACKAGE__,$data); + +} + +sub delete_row { + + my ($xa_db,$data) = @_; + + check_table(); + return delete_record($get_db,$xa_db,__PACKAGE__,$data); + +} + +sub insert_row { + + my $db = &$get_db(); + my $xa_db = shift // $db; + if ('HASH' eq ref $_[0]) { + my ($data,$insert_ignore) = @_; + check_table(); + if (insert_record($db,$xa_db,__PACKAGE__,$data,$insert_ignore,$insert_unique_fields)) { + return $xa_db->db_last_insert_id(); + } + } else { + my %params = @_; + my ($prefix,$name) = @params{qw/ + prefix + name + /}; + + if ($xa_db->db_do('INSERT INTO ' . $db->tableidentifier($tablename) . ' (' . + $db->columnidentifier('prefix') . ', ' . + $db->columnidentifier('name') . ') VALUES (' . + '?, ?)', + $prefix,$name, + )) { + rowinserted($db,$tablename,getlogger(__PACKAGE__)); + return $xa_db->db_last_insert_id(); + } + } + return undef; + +} + sub buildrecords_fromrows { my ($rows,$load_recursive) = @_; diff --git a/lib/NGCP/BulkProcessor/FileProcessor.pm b/lib/NGCP/BulkProcessor/FileProcessor.pm index 1a094fa6..c0c22c2b 100644 --- a/lib/NGCP/BulkProcessor/FileProcessor.pm +++ b/lib/NGCP/BulkProcessor/FileProcessor.pm @@ -254,7 +254,13 @@ sub process { foreach my $line (@lines) { $context->{linesread} += 1; my $row = &$extractfields_code($context,(ref $line ? $line : \$line)); - push(@rowblock,$row) if defined $row; + if (exists $row->[0] and 'ARRAY' eq ref $row->[0]) { + foreach (@$row) { + push(@rowblock,$_) if defined $_; + } + } else { + push(@rowblock,$row) if defined $row; + } } my $realblocksize = scalar @rowblock; if ($realblocksize > 0) { @@ -404,7 +410,13 @@ sub _reader { foreach my $line (@lines) { $context->{linesread} += 1; my $row = &$extractfields_code($context,(ref $line ? $line : \$line)); - push(@rowblock,shared_clone($row)) if defined $row; + if (exists $row->[0] and 'ARRAY' eq ref $row->[0]) { + foreach (@$row) { + push(@rowblock,shared_clone($_)) if defined $_; + } + } else { + push(@rowblock,shared_clone($row)) if defined $row; + } yield(); } my $realblocksize = scalar @rowblock; diff --git a/lib/NGCP/BulkProcessor/Projects/ETL/Lnp/Dao/lnp.pm b/lib/NGCP/BulkProcessor/Projects/ETL/Lnp/Dao/lnp.pm new file mode 100644 index 00000000..3af0c1c6 --- /dev/null +++ b/lib/NGCP/BulkProcessor/Projects/ETL/Lnp/Dao/lnp.pm @@ -0,0 +1,326 @@ +package NGCP::BulkProcessor::Projects::ETL::Lnp::Dao::lnp; +use strict; + +## no critic + +use NGCP::BulkProcessor::Projects::ETL::Lnp::ProjectConnectorPool qw( + get_sqlite_db + destroy_all_dbs +); + +use NGCP::BulkProcessor::SqlProcessor qw( + registertableinfo + create_targettable + checktableinfo + copy_row + + insert_stmt + process_table +); + +use NGCP::BulkProcessor::SqlRecord qw(); + +require Exporter; +our @ISA = qw(Exporter NGCP::BulkProcessor::SqlRecord); +our @EXPORT_OK = qw( + create_table + gettablename + check_table + getinsertstatement + getupsertstatement + + @fieldnames + has_rows + + update_delta + countby_delta + + $deleted_delta + $updated_delta + $added_delta + + find_carriers_by_delta + + process_records +); + +my $tablename = 'lnp'; +my $get_db = \&get_sqlite_db; + +our @fieldnames = ( + 'carrier_name', + 'carrier_prefix', + 'number', + 'routing_number', + 'start', + 'end', + 'authoritative', + 'skip_rewrite', + 'type', + #calculated fields at the end! + #'rownum', + #'filenum', + #'filename', +); + +my $expected_fieldnames = [ + @fieldnames, + 'delta', +]; + +# table creation: +my $primarykey_fieldnames = [ 'number' ]; +my $indexes = { + #$tablename . '_number' => [ 'number(32)' ], + #$tablename . '_rownum' => [ 'rownum(11)' ], + $tablename . '_delta' => [ 'delta(7)' ], + $tablename . '_carrier_delta' => [ 'carrier_name(255)', 'carrier_prefix(32)', 'authoritative(1)', 'skip_rewrite(1)', 'delta(7)' ], +}; +#my $fixtable_statements = []; + +our $deleted_delta = 'DELETED'; +our $updated_delta = 'UPDATED'; +our $added_delta = 'ADDED'; + +sub new { + + my $class = shift; + my $self = NGCP::BulkProcessor::SqlRecord->new($class,$get_db, + $tablename,$expected_fieldnames,$indexes); + + copy_row($self,shift,$expected_fieldnames); + + return $self; + +} + +sub create_table { + + my ($truncate) = @_; + + my $db = &$get_db(); + + registertableinfo($db,__PACKAGE__,$tablename,$expected_fieldnames,$indexes,$primarykey_fieldnames); + return create_targettable($db,__PACKAGE__,$db,__PACKAGE__,$tablename,$truncate,0,undef); + +} + +sub find_carriers_by_delta { + + my ($deltas,$load_recursive) = @_; + + check_table(); + my $db = &$get_db(); + my $table = $db->tableidentifier($tablename); + + my $stmt = ''; + my @params = (); + if (defined $deltas and 'HASH' eq ref $deltas) { + foreach my $in (keys %$deltas) { + my @values = (defined $deltas->{$in} and 'ARRAY' eq ref $deltas->{$in} ? @{$deltas->{$in}} : ($deltas->{$in})); + $stmt .= ' AND ' if length($stmt); + $stmt .= $db->columnidentifier('delta') . ' ' . $in . ' (' . substr(',?' x scalar @values,1) . ')'; + push(@params,@values); + } + } elsif (defined $deltas and length($deltas) > 0) { + $stmt = $db->columnidentifier('delta') . ' = ?'; + push(@params,$deltas); + } + $stmt = ' WHERE ' . $stmt if length($stmt); + + $stmt = 'SELECT * FROM ' . $table . $stmt . ' GROUP BY ' + . $db->columnidentifier('carrier_name') + . ', ' . $db->columnidentifier('carrier_prefix') + . ', ' . $db->columnidentifier('authoritative') + . ', ' . $db->columnidentifier('skip_rewrite'); + + my $rows = $db->db_get_all_arrayref($stmt,@params); + + return buildrecords_fromrows($rows,$load_recursive); + +} + +sub update_delta { + + my ($number,$delta) = @_; + + check_table(); + my $db = &$get_db(); + my $table = $db->tableidentifier($tablename); + + my $stmt = 'UPDATE ' . $table . ' SET delta = ?'; + my @params = (); + push(@params,$delta); + if (defined $number) { + $stmt .= ' WHERE ' . $db->columnidentifier('number') . ' = ?'; + push(@params,$number); + } + + return $db->db_do($stmt,@params); + +} + +sub countby_delta { + + my ($deltas) = @_; + + check_table(); + my $db = &$get_db(); + my $table = $db->tableidentifier($tablename); + + my $stmt = ''; + my @params = (); + if (defined $deltas and 'HASH' eq ref $deltas) { + foreach my $in (keys %$deltas) { + my @values = (defined $deltas->{$in} and 'ARRAY' eq ref $deltas->{$in} ? @{$deltas->{$in}} : ($deltas->{$in})); + $stmt .= ' AND ' if length($stmt); + $stmt .= $db->columnidentifier('delta') . ' ' . $in . ' (' . substr(',?' x scalar @values,1) . ')'; + push(@params,@values); + } + } elsif (defined $deltas and length($deltas) > 0) { + $stmt = $db->columnidentifier('delta') . ' = ?'; + push(@params,$deltas); + } + $stmt = ' WHERE ' . $stmt if length($stmt); + + $stmt = 'SELECT COUNT(*) FROM ' . $table . $stmt; + + return $db->db_get_value($stmt,@params); + +} + +sub has_rows { + + check_table(); + my $db = &$get_db(); + my $table = $db->tableidentifier($tablename); + + my $stmt = 'SELECT COUNT(1) FROM (SELECT 1 FROM ' . $table . ' LIMIT 1) AS q'; + + return $db->db_get_value($stmt); +} + +sub buildrecords_fromrows { + + my ($rows,$load_recursive) = @_; + + my @records = (); + my $record; + + if (defined $rows and ref $rows eq 'ARRAY') { + foreach my $row (@$rows) { + $record = __PACKAGE__->new($row); + + # transformations go here ... + + push @records,$record; + } + } + + return \@records; + +} + +sub process_records { + + my %params = @_; + my ($process_code, + $static_context, + $init_process_context_code, + $uninit_process_context_code, + $multithreading, + $numofthreads, + $deltas) = @params{qw/ + process_code + static_context + init_process_context_code + uninit_process_context_code + multithreading + numofthreads + deltas + /}; + + check_table(); + my $db = &$get_db(); + my $table = $db->tableidentifier($tablename); + + my @terms = (); + if (defined $deltas and 'HASH' eq ref $deltas) { + foreach my $in (keys %$deltas) { + my @values = (defined $deltas->{$in} and 'ARRAY' eq ref $deltas->{$in} ? @{$deltas->{$in}} : ($deltas->{$in})); + push(@terms,$db->columnidentifier('delta') . ' ' . $in . ' ("' . join('","',@values) . '")'); + } + } elsif (defined $deltas and length($deltas) > 0) { + push(@terms,$db->columnidentifier('delta') . ' = "' . $deltas . '"'); + } + + return process_table( + get_db => $get_db, + class => __PACKAGE__, + process_code => sub { + my ($context,$rowblock,$row_offset) = @_; + return &$process_code($context,$rowblock,$row_offset); + }, + static_context => $static_context, + init_process_context_code => $init_process_context_code, + uninit_process_context_code => $uninit_process_context_code, + destroy_reader_dbs_code => \&destroy_all_dbs, + multithreading => $multithreading, + tableprocessing_threads => $numofthreads, + ((scalar @terms) ? ('select' => 'SELECT * FROM ' . $table . ' WHERE ' . join (' AND ',@terms)) : ()), + ((scalar @terms) ? ('selectcount' => 'SELECT COUNT(1) FROM ' . $table . ' WHERE ' . join (' AND ',@terms)) : ()), + ); +} + +sub carrier_hash { + my $self = shift; + return ($self->{carrier_name} // '') . '-' . ($self->{carrier_prefix} // '') + . '-' . ($self->{authoritative} // '') . '-' . ($self->{skip_rewrite} // ''); +} + +sub getinsertstatement { + + my ($insert_ignore) = @_; + check_table(); + return insert_stmt($get_db,__PACKAGE__,$insert_ignore); + +} + +sub getupsertstatement { + + check_table(); + my $db = &$get_db(); + my $table = $db->tableidentifier($tablename); + my $upsert_stmt = 'INSERT OR REPLACE INTO ' . $table . ' (' . + join(', ',map { local $_ = $_; $_ = $db->columnidentifier($_); $_; } @$expected_fieldnames) . ')'; + my @values = (); + foreach my $fieldname (@$expected_fieldnames) { + if ('delta' eq $fieldname) { + my $stmt = 'SELECT \'' . $updated_delta . '\' FROM ' . $table . ' WHERE ' . + $db->columnidentifier('number') . ' = ?'; + push(@values,'COALESCE((' . $stmt . '), \'' . $added_delta . '\')'); + } else { + push(@values,'?'); + } + } + $upsert_stmt .= ' VALUES (' . join(',',@values) . ')'; + return $upsert_stmt; + +} + +sub gettablename { + + return $tablename; + +} + +sub check_table { + + return checktableinfo($get_db, + __PACKAGE__,$tablename, + $expected_fieldnames, + $indexes); + +} + +1; diff --git a/lib/NGCP/BulkProcessor/Projects/ETL/Lnp/FileProcessors/NumbersFile.pm b/lib/NGCP/BulkProcessor/Projects/ETL/Lnp/FileProcessors/NumbersFile.pm new file mode 100644 index 00000000..f36394ae --- /dev/null +++ b/lib/NGCP/BulkProcessor/Projects/ETL/Lnp/FileProcessors/NumbersFile.pm @@ -0,0 +1,66 @@ +package NGCP::BulkProcessor::Projects::ETL::Lnp::FileProcessors::NumbersFile; +use strict; + +## no critic + +use Encode qw(decode); + +use NGCP::BulkProcessor::Logging qw( + getlogger +); +use NGCP::BulkProcessor::LogError qw( + fileprocessingerror + fileprocessingwarn +); + +use NGCP::BulkProcessor::Projects::ETL::Lnp::Settings qw( + $expand_numbers_code +); + +use NGCP::BulkProcessor::FileProcessor; + +use NGCP::BulkProcessor::Array qw(contains); + +require Exporter; +our @ISA = qw(Exporter NGCP::BulkProcessor::FileProcessor); +our @EXPORT_OK = qw(); + +my $lineseparator = '\\r\\n|\\r|\\n|\\s'; #\\n\\r +my $default_encoding = 'UTF-8'; + +my $buffersize = 1000 * 1024; +my $threadqueuelength = 10; +my $default_numofthreads = 3; +#my $multithreading = 0; +my $blocksize = 100; + +sub new { + + my $class = shift; + + my $self = NGCP::BulkProcessor::FileProcessor->new(@_); + + $self->{numofthreads} = shift // $default_numofthreads; + $self->{line_separator} = $lineseparator; + $self->{field_separator} = undef; + $self->{encoding} = shift // $default_encoding; + $self->{buffersize} = $buffersize; + $self->{threadqueuelength} = $threadqueuelength; + #$self->{multithreading} = $multithreading; + $self->{blocksize} = $blocksize; + + bless($self,$class); + + #restdebug($self,__PACKAGE__ . ' file processor created',getlogger(__PACKAGE__)); + + return $self; + +} + +sub extractfields { + my ($context,$line_ref) = @_; + return $expand_numbers_code->($context,$$line_ref); + +} + +1; diff --git a/lib/NGCP/BulkProcessor/Projects/ETL/Lnp/Import.pm b/lib/NGCP/BulkProcessor/Projects/ETL/Lnp/Import.pm new file mode 100644 index 00000000..5d9fb8f1 --- /dev/null +++ b/lib/NGCP/BulkProcessor/Projects/ETL/Lnp/Import.pm @@ -0,0 +1,190 @@ +package NGCP::BulkProcessor::Projects::ETL::Lnp::Import; +use strict; + +## no critic + +use threads::shared qw(); + +#use Encode qw(); + +use NGCP::BulkProcessor::Projects::ETL::Lnp::Settings qw( + $import_multithreading + + $lnp_filename + $lnp_rownum_start + $lnp_import_numofthreads + $ignore_lnp_unique + $lnp_import_single_row_txn + + $expand_numbers_code + + $skip_errors + +); +use NGCP::BulkProcessor::Logging qw ( + getlogger + processing_info + processing_debug +); +use NGCP::BulkProcessor::LogError qw( + fileprocessingwarn + fileprocessingerror +); + +use NGCP::BulkProcessor::FileProcessors::CSVFileSimple qw(); +use NGCP::BulkProcessor::Projects::ETL::Lnp::FileProcessors::NumbersFile qw(); + +use NGCP::BulkProcessor::Projects::ETL::Lnp::ProjectConnectorPool qw( + get_sqlite_db + destroy_all_dbs +); + +use NGCP::BulkProcessor::Projects::ETL::Lnp::Dao::lnp qw(); + +use NGCP::BulkProcessor::Utils qw(threadid trim); + +require Exporter; +our @ISA = qw(Exporter); +our @EXPORT_OK = qw( + load_file +); + +sub load_file { + + my $result = NGCP::BulkProcessor::Projects::ETL::Lnp::Dao::lnp::create_table(0); + + my $importer; + if (defined $expand_numbers_code) { + $importer = NGCP::BulkProcessor::Projects::ETL::Lnp::FileProcessors::NumbersFile->new($lnp_import_numofthreads); + } else { + $importer = NGCP::BulkProcessor::FileProcessors::CSVFileSimple->new($lnp_import_numofthreads); + } + + my $upsert = _lnp_reset_delta(); + + destroy_all_dbs(); #close all db connections before forking.. + my $warning_count :shared = 0; + return ($result && $importer->process( + file => $lnp_filename, + process_code => sub { + my ($context,$rows,$row_offset) = @_; + my $rownum = $row_offset; + $context->{lnp_rows} = []; + foreach my $row (@$rows) { + $rownum++; + next if (defined $lnp_rownum_start and $rownum < $lnp_rownum_start); + next if (scalar @$row) == 0; + #$row = [ map { local $_ = $_; trim($_); $_ =~ s/^"//; $_ =~ s/"$//r; } @$row ]; + my $record = NGCP::BulkProcessor::Projects::ETL::Lnp::Dao::lnp->new([ + map { local $_ = $_; trim($_); $_ =~ s/^"//; $_ =~ s/"$//r; } @$row + ]); + #$record->{number} = $record->{cc} . $record->{ac} . $record->{sn}; + + my %r = %$record; my @row_ext = @r{@NGCP::BulkProcessor::Projects::ETL::Lnp::Dao::lnp::fieldnames}; + if ($context->{upsert}) { + push(@row_ext,$record->{number}); + } else { + push(@row_ext,$NGCP::BulkProcessor::Projects::ETL::Lnp::Dao::lnp::added_delta); + } + push(@{$context->{lnp_rows}},\@row_ext); + if ($lnp_import_single_row_txn and (scalar @{$context->{lnp_rows}}) > 0) { + while (defined (my $lnp_row = shift @{$context->{lnp_rows}})) { + if ($skip_errors) { + eval { _insert_lnp_rows($context,[$lnp_row]); }; + _warn($context,$@) if $@; + } else { + _insert_lnp_rows($context,[$lnp_row]); + } + } + } + } + + if (not $lnp_import_single_row_txn and (scalar @{$context->{lnp_rows}}) > 0) { + if ($skip_errors) { + eval { _insert_lnp_rows($context,$context->{lnp_rows}); }; + _warn($context,$@) if $@; + } else { + _insert_lnp_rows($context,$context->{lnp_rows}); + } + } + return 1; + }, + init_process_context_code => sub { + my ($context)= @_; + $context->{db} = &get_sqlite_db(); + $context->{upsert} = $upsert; + $context->{error_count} = 0; + $context->{warning_count} = 0; + }, + uninit_process_context_code => sub { + my ($context)= @_; + undef $context->{db}; + destroy_all_dbs(); + { + lock $warning_count; + $warning_count += $context->{warning_count}; + } + }, + multithreading => $import_multithreading, + ),$warning_count); + +} + +sub _insert_lnp_rows { + my ($context,$lnp_rows) = @_; + $context->{db}->db_do_begin(($context->{upsert} ? + NGCP::BulkProcessor::Projects::ETL::Lnp::Dao::lnp::getupsertstatement() + : NGCP::BulkProcessor::Projects::ETL::Lnp::Dao::lnp::getinsertstatement($ignore_lnp_unique)), + ); + eval { + $context->{db}->db_do_rowblock($lnp_rows); + $context->{db}->db_finish(); + }; + my $err = $@; + if ($err) { + eval { + $context->{db}->db_finish(1); + }; + die($err); + } +} + +sub _lnp_reset_delta { + my $upsert = 0; + if (NGCP::BulkProcessor::Projects::ETL::Lnp::Dao::lnp::has_rows()) { + processing_info(threadid(),'resetting delta of ' . + NGCP::BulkProcessor::Projects::ETL::Lnp::Dao::lnp::update_delta(undef, + $NGCP::BulkProcessor::Projects::ETL::Lnp::Dao::lnp::deleted_delta) . + ' lnp records',getlogger(__PACKAGE__)); + $upsert |= 1; + } + return $upsert; +} + +sub _error { + + my ($context,$message) = @_; + $context->{error_count} = $context->{error_count} + 1; + fileprocessingerror($context->{filename},$message,getlogger(__PACKAGE__)); + +} + +sub _warn { + + my ($context,$message) = @_; + $context->{warning_count} = $context->{warning_count} + 1; + fileprocessingwarn($context->{filename},$message,getlogger(__PACKAGE__)); + +} + +sub _info { + + my ($context,$message,$debug) = @_; + if ($debug) { + processing_debug($context->{tid},$message,getlogger(__PACKAGE__)); + } else { + processing_info($context->{tid},$message,getlogger(__PACKAGE__)); + } +} + +1; diff --git a/lib/NGCP/BulkProcessor/Projects/ETL/Lnp/ProcessLnp.pm b/lib/NGCP/BulkProcessor/Projects/ETL/Lnp/ProcessLnp.pm new file mode 100644 index 00000000..1b1dab12 --- /dev/null +++ b/lib/NGCP/BulkProcessor/Projects/ETL/Lnp/ProcessLnp.pm @@ -0,0 +1,322 @@ +package NGCP::BulkProcessor::Projects::ETL::Lnp::ProcessLnp; +use strict; + +## no critic + +use threads::shared qw(); + +use NGCP::BulkProcessor::Projects::ETL::Lnp::Settings qw( + + $skip_errors + + $create_lnp_multithreading + $create_lnp_numofthreads + + $delete_lnp_multithreading + $delete_lnp_numofthreads + + $ignore_lnp_numbers_unique + $lnp_numbers_single_row_txn + + $lnp_numbers_batch_delete +); + +use NGCP::BulkProcessor::Logging qw ( + getlogger + processing_info + processing_debug +); +use NGCP::BulkProcessor::LogError qw( + rowprocessingerror + rowprocessingwarn +); + +use NGCP::BulkProcessor::Dao::Trunk::billing::lnp_providers qw(); +use NGCP::BulkProcessor::Dao::Trunk::billing::lnp_numbers qw(); + +use NGCP::BulkProcessor::Projects::ETL::Lnp::Dao::lnp qw(); + +use NGCP::BulkProcessor::ConnectorPool qw( + get_xa_db +); + +use NGCP::BulkProcessor::Projects::ETL::Lnp::ProjectConnectorPool qw( + get_sqlite_db + destroy_all_dbs + ping_all_dbs +); + +use NGCP::BulkProcessor::Utils qw(threadid); + +require Exporter; +our @ISA = qw(Exporter); +our @EXPORT_OK = qw( + create_lnp_numbers + delete_lnp_numbers +); + +sub create_lnp_numbers { + + my $static_context = {}; + my $result = _create_lnp_numbers_checks($static_context); + + destroy_all_dbs(); + my $warning_count :shared = 0; + my $result = $result && NGCP::BulkProcessor::Projects::ETL::Lnp::Dao::lnp::process_records( + static_context => $static_context, + deltas => $NGCP::BulkProcessor::Projects::ETL::Lnp::Dao::lnp::added_delta, + process_code => sub { + my ($context,$records,$row_offset) = @_; + ping_all_dbs(); + foreach my $row (@$records) { + my $lnp = NGCP::BulkProcessor::Projects::ETL::Lnp::Dao::lnp->new($row); + my $lnp_number = NGCP::BulkProcessor::Dao::Trunk::billing::lnp_numbers->new({ %$lnp }); + my $lnp_provider = $context->{carrier_map}->{ + $lnp->carrier_hash() + }; + $lnp_number->{lnp_provider_id} = $lnp_provider->{id}; + + my %r = %$lnp_number; my @row_ext = @r{@NGCP::BulkProcessor::Dao::Trunk::billing::lnp_numbers::fieldnames}; + + push(@{$context->{lnp_numbers}},\@row_ext); + if ($lnp_numbers_single_row_txn and (scalar @{$context->{lnp_numbers}}) > 0) { + while (defined (my $lnp_number = shift @{$context->{lnp_numbers}})) { + if ($skip_errors) { + eval { _insert_lnp_numbers($context,[$lnp_number]); }; + _warn($context,$@) if $@; + } else { + _insert_lnp_numbers($context,[$lnp_number]); + } + } + } + } + + if (not $lnp_numbers_single_row_txn and (scalar @{$context->{lnp_numbers}}) > 0) { + if ($skip_errors) { + eval { _insert_lnp_numbers($context,$context->{lnp_numbers}); }; + _warn($context,$@) if $@; + } else { + _insert_lnp_numbers($context,$context->{lnp_numbers}); + } + } + return 1; + }, + init_process_context_code => sub { + my ($context)= @_; + $context->{db} = &get_xa_db(); + $context->{error_count} = 0; + $context->{warning_count} = 0; + $context->{lnp_numbers} = []; + }, + uninit_process_context_code => sub { + my ($context)= @_; + undef $context->{db}; + destroy_all_dbs(); + { + lock $warning_count; + $warning_count += $context->{warning_count}; + } + }, + multithreading => $create_lnp_multithreading, + numofthreads => $create_lnp_numofthreads, + ); + + return ($result,$warning_count); + +} + + +sub _create_lnp_numbers_checks { + + my $context = shift; + my $result = 1; + + $context->{carrier_map} = {}; + my $carriers = []; + eval { + $carriers = NGCP::BulkProcessor::Projects::ETL::Lnp::Dao::lnp::find_carriers_by_delta($NGCP::BulkProcessor::Projects::ETL::Lnp::Dao::lnp::added_delta); + }; + if ($@) { + $result = 0; #even in skip-error mode.. + } else { + foreach my $carrier (@$carriers) { + my $lp = { + name => $carrier->{carrier_name}, + prefix => ($carrier->{carrier_prefix} // ''), + authoritative => ($carrier->{authoritative} // 0), + skip_rewrite => ($carrier->{skip_rewrite} // 0), + }; + my $lnp_provider = NGCP::BulkProcessor::Dao::Trunk::billing::lnp_providers::findby_nameprefixauthoritativeskiprewrite( + $lp->{name}, + $lp->{prefix}, + $lp->{authoritative}, + $lp->{skip_rewrite}, + )->[0]; + if ($lnp_provider) { + processing_info(threadid(),"lnp provider '$lnp_provider->{name}' found",getlogger(__PACKAGE__)); + } else { + $lnp_provider = { %$lp }; + $lnp_provider->{id} = NGCP::BulkProcessor::Dao::Trunk::billing::lnp_providers::insert_row(undef,$lp); + processing_info(threadid(),"lnp provider '$lnp_provider->{name}' created",getlogger(__PACKAGE__)); + } + $context->{carrier_map}->{ + $carrier->carrier_hash() + } = $lnp_provider; + } + } + + return $result; +} + + +sub _insert_lnp_numbers { + my ($context,$lnp_numbers) = @_; + $context->{db}->db_do_begin( + NGCP::BulkProcessor::Dao::Trunk::billing::lnp_numbers::getinsertstatement($ignore_lnp_numbers_unique), + ); + eval { + $context->{db}->db_do_rowblock($lnp_numbers); + $context->{db}->db_finish(); + }; + my $err = $@; + if ($err) { + eval { + $context->{db}->db_finish(1); + }; + die($err); + } +} + + +sub delete_lnp_numbers { + + my $static_context = {}; + my $result = 1; + + destroy_all_dbs(); + my $warning_count :shared = 0; + my $result = $result && NGCP::BulkProcessor::Projects::ETL::Lnp::Dao::lnp::process_records( + static_context => $static_context, + deltas => $NGCP::BulkProcessor::Projects::ETL::Lnp::Dao::lnp::deleted_delta, + process_code => sub { + my ($context,$records,$row_offset) = @_; + ping_all_dbs(); + foreach my $row (@$records) { + my $lnp = NGCP::BulkProcessor::Projects::ETL::Lnp::Dao::lnp->new($row); + push(@{$context->{numbers}},$lnp->{number}); + if (not $lnp_numbers_batch_delete and (scalar @{$context->{numbers}}) > 0) { + while (defined (my $number = shift @{$context->{numbers}})) { + if ($skip_errors) { + eval { + NGCP::BulkProcessor::Dao::Trunk::billing::lnp_numbers::delete_numbers($context->{db},$number); + }; + } else { + NGCP::BulkProcessor::Dao::Trunk::billing::lnp_numbers::delete_numbers($context->{db},$number); + } + } + } + } + + if ($lnp_numbers_batch_delete and (scalar @{$context->{numbers}}) > 0) { + if ($skip_errors) { + eval { + NGCP::BulkProcessor::Dao::Trunk::billing::lnp_numbers::delete_numbers($context->{db},{ + 'IN' => $context->{numbers}, + }); + }; + } else { + NGCP::BulkProcessor::Dao::Trunk::billing::lnp_numbers::delete_numbers($context->{db},{ + 'IN' => $context->{numbers}, + }); + } + } + return 1; + }, + init_process_context_code => sub { + my ($context)= @_; + $context->{db} = &get_xa_db(); + $context->{error_count} = 0; + $context->{warning_count} = 0; + $context->{numbers} = []; + }, + uninit_process_context_code => sub { + my ($context)= @_; + undef $context->{db}; + destroy_all_dbs(); + { + lock $warning_count; + $warning_count += $context->{warning_count}; + } + }, + multithreading => $create_lnp_multithreading, + numofthreads => $create_lnp_numofthreads, + ) && _delete_lnp_providers($static_context); + + return ($result,$warning_count); + +} + + +sub _delete_lnp_providers { + + my $context = shift; + my $result = 1; + + my $carriers = []; + eval { + $carriers = NGCP::BulkProcessor::Projects::ETL::Lnp::Dao::lnp::find_carriers_by_delta($NGCP::BulkProcessor::Projects::ETL::Lnp::Dao::lnp::deleted_delta); + }; + if ($@) { + $result = 0; #even in skip-error mode.. + } else { + foreach my $carrier (@$carriers) { + my $lp = { + name => $carrier->{carrier_name}, + prefix => ($carrier->{carrier_prefix} // ''), + authoritative => ($carrier->{authoritative} // 0), + skip_rewrite => ($carrier->{skip_rewrite} // 0), + }; + foreach my $lnp_provider (@{NGCP::BulkProcessor::Dao::Trunk::billing::lnp_providers::findby_nameprefixauthoritativeskiprewrite( + $lp->{name}, + $lp->{prefix}, + $lp->{authoritative}, + $lp->{skip_rewrite}, + )}) { + NGCP::BulkProcessor::Dao::Trunk::billing::lnp_providers::delete_row(undef,$lnp_provider); + processing_info(threadid(),"lnp provider '$lnp_provider->{name}' removed",getlogger(__PACKAGE__)); + } + } + } + + return $result; +} + + +sub _error { + + my ($context,$message) = @_; + $context->{error_count} = $context->{error_count} + 1; + rowprocessingerror($context->{tid} // threadid(),$message,getlogger(__PACKAGE__)); + +} + +sub _warn { + + my ($context,$message) = @_; + $context->{warning_count} = $context->{warning_count} + 1; + rowprocessingwarn($context->{tid} // threadid(),$message,getlogger(__PACKAGE__)); + +} + +sub _info { + + my ($context,$message,$debug) = @_; + if ($debug) { + processing_debug($context->{tid} // threadid(),$message,getlogger(__PACKAGE__)); + } else { + processing_info($context->{tid} // threadid(),$message,getlogger(__PACKAGE__)); + } + +} + +1; diff --git a/lib/NGCP/BulkProcessor/Projects/ETL/Lnp/ProjectConnectorPool.pm b/lib/NGCP/BulkProcessor/Projects/ETL/Lnp/ProjectConnectorPool.pm new file mode 100644 index 00000000..318c8b19 --- /dev/null +++ b/lib/NGCP/BulkProcessor/Projects/ETL/Lnp/ProjectConnectorPool.pm @@ -0,0 +1,92 @@ +package NGCP::BulkProcessor::Projects::ETL::Lnp::ProjectConnectorPool; +use strict; + +## no critic + +use File::Basename; +use Cwd; +use lib Cwd::abs_path(File::Basename::dirname(__FILE__) . '/../../../'); + +use NGCP::BulkProcessor::Projects::ETL::Lnp::Settings qw( + $sqlite_db_file +); + +use NGCP::BulkProcessor::ConnectorPool qw( + get_connectorinstancename + ping +); + +use NGCP::BulkProcessor::SqlConnectors::MySQLDB; +use NGCP::BulkProcessor::SqlConnectors::SQLiteDB qw($staticdbfilemode); +#use NGCP::BulkProcessor::RestConnectors::NGCPRestApi; + +use NGCP::BulkProcessor::SqlProcessor qw(cleartableinfo); + +require Exporter; +our @ISA = qw(Exporter); +our @EXPORT_OK = qw( + + get_sqlite_db + sqlite_db_tableidentifier + + destroy_dbs + destroy_all_dbs + + ping_dbs + ping_all_dbs +); + +my $sqlite_dbs = {}; + +sub get_sqlite_db { + + my ($instance_name,$reconnect) = @_; + my $name = get_connectorinstancename($instance_name); #threadid(); #shift; + + if (not defined $sqlite_dbs->{$name}) { + $sqlite_dbs->{$name} = NGCP::BulkProcessor::SqlConnectors::SQLiteDB->new($instance_name); #$name); + if (not defined $reconnect) { + $reconnect = 1; + } + } + if ($reconnect) { + $sqlite_dbs->{$name}->db_connect($staticdbfilemode,$sqlite_db_file); + } + + return $sqlite_dbs->{$name}; + +} + +sub sqlite_db_tableidentifier { + + my ($get_target_db,$tablename) = @_; + my $target_db = (ref $get_target_db eq 'CODE') ? &$get_target_db() : $get_target_db; + return $target_db->getsafetablename(NGCP::BulkProcessor::SqlConnectors::SQLiteDB::get_tableidentifier($tablename,$staticdbfilemode,$sqlite_db_file)); + +} + +sub ping_dbs { + +} + +sub ping_all_dbs { + ping_dbs(); + NGCP::BulkProcessor::ConnectorPool::ping_dbs(); +} + +sub destroy_dbs { + + foreach my $name (keys %$sqlite_dbs) { + cleartableinfo($sqlite_dbs->{$name}); + undef $sqlite_dbs->{$name}; + delete $sqlite_dbs->{$name}; + } + +} + +sub destroy_all_dbs() { + destroy_dbs(); + NGCP::BulkProcessor::ConnectorPool::destroy_dbs(); +} + +1; diff --git a/lib/NGCP/BulkProcessor/Projects/ETL/Lnp/Settings.pm b/lib/NGCP/BulkProcessor/Projects/ETL/Lnp/Settings.pm new file mode 100644 index 00000000..610755ee --- /dev/null +++ b/lib/NGCP/BulkProcessor/Projects/ETL/Lnp/Settings.pm @@ -0,0 +1,218 @@ +package NGCP::BulkProcessor::Projects::ETL::Lnp::Settings; +use strict; + +## no critic + +use threads::shared qw(); + +use File::Basename qw(fileparse); + +use NGCP::BulkProcessor::Globals qw( + $working_path + $enablemultithreading + $cpucount + create_path +); + +use NGCP::BulkProcessor::Logging qw( + getlogger + scriptinfo + configurationinfo +); + +use NGCP::BulkProcessor::LogError qw( + fileerror + filewarn + configurationwarn + configurationerror +); + +use NGCP::BulkProcessor::LoadConfig qw( + split_tuple + parse_regexp +); +use NGCP::BulkProcessor::Utils qw(prompt); + +use NGCP::BulkProcessor::Array qw(contains); + +require Exporter; +our @ISA = qw(Exporter); +our @EXPORT_OK = qw( + update_settings + + $sqlite_db_file + + check_dry + + $input_path + + $defaultsettings + $defaultconfig + + $dry + $skip_errors + $force + + $import_multithreading + + $lnp_filename + $lnp_rownum_start + $lnp_import_numofthreads + $ignore_lnp_unique + $lnp_import_single_row_txn + + $expand_numbers_code + + $create_lnp_multithreading + $create_lnp_numofthreads + + $delete_lnp_multithreading + $delete_lnp_numofthreads + + $ignore_lnp_numbers_unique + $lnp_numbers_single_row_txn + + $lnp_numbers_batch_delete +); + +our $defaultconfig = 'config.cfg'; +our $defaultsettings = 'settings.yml'; + +our $input_path = $working_path . 'input/'; + +our $force = 0; +our $dry = 0; +our $skip_errors = 0; + +our $sqlite_db_file = 'sqlite'; + +our $import_multithreading = 1; + +our $lnp_filename = undef; +our $lnp_rownum_start = 2; +our $lnp_import_numofthreads = $cpucount; +our $ignore_lnp_unique = 0; +our $lnp_import_single_row_txn = 0; +our $expand_numbers_code = undef; + +our $create_lnp_multithreading = 1; +our $create_lnp_numofthreads = $cpucount; + +our $delete_lnp_multithreading = 1; +our $delete_lnp_numofthreads = $cpucount; + +our $ignore_lnp_numbers_unique = 0; +our $lnp_numbers_single_row_txn = 0; + +our $lnp_numbers_batch_delete = 1; + +sub update_settings { + + my ($data,$configfile) = @_; + + if (defined $data) { + + my $result = 1; + my $regexp_result; + + #&$configurationinfocode("testinfomessage",$configlogger); + + $result &= _prepare_working_paths(1); + + $sqlite_db_file = $data->{sqlite_db_file} if exists $data->{sqlite_db_file}; + + $lnp_filename = _get_import_filename($lnp_filename,$data,'lnp_filename'); + unless ($lnp_filename and -e $lnp_filename) { + configurationerror($configfile,"invalid lnp filename",getlogger(__PACKAGE__)); + } + $lnp_rownum_start = $data->{lnp_rownum_start} if exists $data->{lnp_rownum_start}; + $lnp_import_single_row_txn = $data->{lnp_import_single_row_txn} if exists $data->{lnp_import_single_row_txn}; + $ignore_lnp_unique = $data->{ignore_lnp_unique} if exists $data->{ignore_lnp_unique}; + + $import_multithreading = $data->{import_multithreading} if exists $data->{import_multithreading}; + $lnp_import_numofthreads = _get_numofthreads($lnp_import_numofthreads,$data,'lnp_import_numofthreads'); + + $dry = $data->{dry} if exists $data->{dry}; + $skip_errors = $data->{skip_errors} if exists $data->{skip_errors}; + + $expand_numbers_code = $data->{expand_numbers} if exists $data->{expand_numbers}; + if (defined $expand_numbers_code and 'CODE' ne ref $expand_numbers_code) { + configurationerror($configfile,"expand_numbers coderef required",getlogger(__PACKAGE__)); + } + + $create_lnp_multithreading = $data->{create_lnp_multithreading} if exists $data->{create_lnp_multithreading}; + $create_lnp_numofthreads = _get_numofthreads($create_lnp_numofthreads,$data,'create_lnp_numofthreads'); + + $delete_lnp_multithreading = $data->{delete_lnp_multithreading} if exists $data->{delete_lnp_multithreading}; + $delete_lnp_numofthreads = _get_numofthreads($delete_lnp_numofthreads,$data,'delete_lnp_numofthreads'); + + $ignore_lnp_numbers_unique = $data->{ignore_lnp_numbers_unique} if exists $data->{ignore_lnp_numbers_unique}; + $lnp_numbers_single_row_txn = $data->{lnp_numbers_single_row_txn} if exists $data->{lnp_numbers_single_row_txn}; + + $lnp_numbers_batch_delete = $data->{lnp_numbers_batch_delete} if exists $data->{lnp_numbers_batch_delete}; + + return $result; + + } + return 0; + +} + +sub _prepare_working_paths { + + my ($create) = @_; + my $result = 1; + my $path_result; + + ($path_result,$input_path) = create_path($working_path . 'input',$input_path,$create,\&fileerror,getlogger(__PACKAGE__)); + $result &= $path_result; + + return $result; + +} + +sub _get_numofthreads { + my ($default_value,$data,$key) = @_; + my $numofthreads = $default_value; + $numofthreads = $data->{$key} if exists $data->{$key}; + $numofthreads = $cpucount if $numofthreads > $cpucount; + return $numofthreads; +} + +sub _get_sqlite_db_file { + my ($run,$name) = @_; + return ((defined $run and length($run) > 0) ? $run . '_' : '') . $name; +} + +sub _get_import_filename { + my ($old_value,$data,$key) = @_; + my $import_filename = $old_value; + $import_filename = $data->{$key} if exists $data->{$key}; + if (defined $import_filename and length($import_filename) > 0) { + $import_filename = $input_path . $import_filename unless -e $import_filename; + } + return $import_filename; +} + +sub check_dry { + + if ($dry) { + scriptinfo('running in dry mode - NGCP databases will not be modified',getlogger(__PACKAGE__)); + return 1; + } else { + scriptinfo('NO DRY MODE - NGCP DATABASES WILL BE MODIFIED!',getlogger(__PACKAGE__)); + if (!$force) { + if ('yes' eq lc(prompt("Type 'yes' to proceed: "))) { + return 1; + } else { + return 0; + } + } else { + scriptinfo('force option applied',getlogger(__PACKAGE__)); + return 1; + } + } + +} + +1; diff --git a/lib/NGCP/BulkProcessor/Projects/ETL/Lnp/config.cfg b/lib/NGCP/BulkProcessor/Projects/ETL/Lnp/config.cfg new file mode 100644 index 00000000..7e74fbb7 --- /dev/null +++ b/lib/NGCP/BulkProcessor/Projects/ETL/Lnp/config.cfg @@ -0,0 +1,62 @@ +##general settings: +working_path = /home/rkrenn/temp/lnp +cpucount = 4 +enablemultithreading = 1 + +##gearman/service listener config: +jobservers = 127.0.0.1:4730 + +##NGCP MySQL connectivity - "accounting" db: +accounting_host = 192.168.0.96 +accounting_port = 3306 +accounting_databasename = accounting +accounting_username = root +accounting_password = + +##NGCP MySQL connectivity - "billing" db: +billing_host = 192.168.0.96 +billing_port = 3306 +billing_databasename = billing +billing_username = root +billing_password = + +##NGCP MySQL connectivity - "provisioning" db: +provisioning_host = 192.168.0.96 +provisioning_port = 3306 +provisioning_databasename = provisioning +provisioning_username = root +provisioning_password = + +##NGCP MySQL connectivity - "kamailio" db: +kamailio_host = 192.168.0.96 +kamailio_port = 3306 +kamailio_databasename = kamailio +kamailio_username = root +kamailio_password = + +##NGCP MySQL connectivity - default db for distributed transactions (XA) to connect to: +xa_host = 192.168.0.96 +xa_port = 3306 +xa_databasename = ngcp +xa_username = root +xa_password = + +##NGCP REST-API connectivity: +ngcprestapi_uri = https://127.0.0.1:1443 +ngcprestapi_username = administrator +ngcprestapi_password = administrator +ngcprestapi_realm = api_admin_http + +##sending email: +emailenable = 0 +erroremailrecipient = +warnemailrecipient = +completionemailrecipient = rkrenn@sipwise.com +doneemailrecipient = + +##logging: +fileloglevel = INFO +#DEBUG +screenloglevel = INFO +#INFO +emailloglevel = OFF diff --git a/lib/NGCP/BulkProcessor/Projects/ETL/Lnp/process.pl b/lib/NGCP/BulkProcessor/Projects/ETL/Lnp/process.pl new file mode 100644 index 00000000..d31d8e60 --- /dev/null +++ b/lib/NGCP/BulkProcessor/Projects/ETL/Lnp/process.pl @@ -0,0 +1,291 @@ +use strict; + +## no critic + +use File::Basename; +use Cwd; +use lib Cwd::abs_path(File::Basename::dirname(__FILE__) . '/../../../../../'); + +use Getopt::Long qw(GetOptions); +use Fcntl qw(LOCK_EX LOCK_NB); + +use NGCP::BulkProcessor::Globals qw(); +use NGCP::BulkProcessor::Projects::ETL::Lnp::Settings qw( + update_settings + + check_dry + + $defaultsettings + $defaultconfig + $dry + $skip_errors + $force + +); + +use NGCP::BulkProcessor::Logging qw( + init_log + getlogger + $attachmentlogfile + scriptinfo + cleanuplogfiles + $currentlogfile +); +use NGCP::BulkProcessor::LogError qw ( + completion + done + scriptwarn + scripterror + filewarn + fileerror +); +use NGCP::BulkProcessor::LoadConfig qw( + load_config + $SIMPLE_CONFIG_TYPE + $YAML_CONFIG_TYPE + $ANY_CONFIG_TYPE +); +use NGCP::BulkProcessor::Array qw(removeduplicates); +use NGCP::BulkProcessor::Utils qw(getscriptpath prompt cleanupdir); +use NGCP::BulkProcessor::Mail qw( + cleanupmsgfiles +); + +use NGCP::BulkProcessor::SqlConnectors::SQLiteDB qw(cleanupdbfiles); + +use NGCP::BulkProcessor::Projects::ETL::Lnp::Dao::lnp qw(); +use NGCP::BulkProcessor::Dao::Trunk::billing::lnp_providers qw(); +use NGCP::BulkProcessor::Dao::Trunk::billing::lnp_numbers qw(); + +use NGCP::BulkProcessor::Projects::ETL::Lnp::ProjectConnectorPool qw(destroy_all_dbs); + +use NGCP::BulkProcessor::Projects::ETL::Lnp::Import qw( + load_file +); + +use NGCP::BulkProcessor::Projects::ETL::Lnp::ProcessLnp qw( + create_lnp_numbers + delete_lnp_numbers +); + +scripterror(getscriptpath() . ' already running',getlogger(getscriptpath())) unless flock DATA, LOCK_EX | LOCK_NB; # not tested on windows yet + +my @TASK_OPTS = (); + +my $tasks = []; + +my $cleanup_task_opt = 'cleanup'; +push(@TASK_OPTS,$cleanup_task_opt); + +my $load_file_task_opt = 'load_file'; +push(@TASK_OPTS,$load_file_task_opt); + +my $create_lnp_task_opt = 'create_lnp'; +push(@TASK_OPTS,$create_lnp_task_opt); + +my $delete_lnp_task_opt = 'delete_lnp'; +push(@TASK_OPTS,$delete_lnp_task_opt); + +if (init()) { + main(); + exit(0); +} else { + exit(1); +} + +sub init { + + my $configfile = $defaultconfig; + my $settingsfile = $defaultsettings; + + return 0 unless GetOptions( + "config=s" => \$configfile, + "settings=s" => \$settingsfile, + "task=s" => $tasks, + "dry" => \$dry, + "skip-errors" => \$skip_errors, + "force" => \$force, + ); # or scripterror('error in command line arguments',getlogger(getscriptpath())); + + $tasks = removeduplicates($tasks,1); + + my $result = load_config($configfile); + init_log(); + $result &= load_config($settingsfile,\&update_settings,$YAML_CONFIG_TYPE); + return $result; + +} + +sub main() { + + my @messages = (); + my @attachmentfiles = (); + my $result = 1; + my $completion = 0; + + if (defined $tasks and 'ARRAY' eq ref $tasks and (scalar @$tasks) > 0) { + scriptinfo('skip-errors: processing won\'t stop upon errors',getlogger(__PACKAGE__)) if $skip_errors; + foreach my $task (@$tasks) { + + if (lc($cleanup_task_opt) eq lc($task)) { + $result &= cleanup_task(\@messages,1) if taskinfo($cleanup_task_opt,$result); + + } elsif (lc($load_file_task_opt) eq lc($task)) { + $result &= load_file_task(\@messages) if taskinfo($load_file_task_opt,$result); + + } elsif (lc($create_lnp_task_opt) eq lc($task)) { + if (taskinfo($create_lnp_task_opt,$result,1)) { + next unless check_dry(); + $result &= create_lnp_task(\@messages); + $completion |= 1; + } + + } elsif (lc($delete_lnp_task_opt) eq lc($task)) { + if (taskinfo($delete_lnp_task_opt,$result,1)) { + next unless check_dry(); + $result &= delete_lnp_task(\@messages); + $completion |= 1; + } + + } else { + $result = 0; + scripterror("unknown task option '" . $task . "', must be one of " . join(', ',@TASK_OPTS),getlogger(getscriptpath())); + last; + } + } + } else { + $result = 0; + scripterror('at least one task option is required. supported tasks: ' . join(', ',@TASK_OPTS),getlogger(getscriptpath())); + } + + push(@attachmentfiles,$attachmentlogfile); + if ($completion) { + completion(join("\n\n",@messages),\@attachmentfiles,getlogger(getscriptpath())); + } else { + done(join("\n\n",@messages),\@attachmentfiles,getlogger(getscriptpath())); + } + + return $result; +} + +sub taskinfo { + my ($task,$result) = @_; + scriptinfo($result ? "starting task: '$task'" : "skipping task '$task' due to previous problems",getlogger(getscriptpath())); + return $result; +} + +sub cleanup_task { + my ($messages,$clean_generated) = @_; + my $result = 0; + if (!$clean_generated or $force or 'yes' eq lc(prompt("Type 'yes' to proceed: "))) { + eval { + #cleanupcvsdirs() if $clean_generated; + cleanupdbfiles() if $clean_generated; + cleanuplogfiles(\&fileerror,\&filewarn,($currentlogfile,$attachmentlogfile)); + cleanupmsgfiles(\&fileerror,\&filewarn); + #cleanupcertfiles(); + #cleanupdir($output_path,1,\&filewarn,getlogger(getscriptpath())) if $clean_generated; + $result = 1; + }; + } + if ($@ or !$result) { + #print $@; + push(@$messages,'working directory cleanup INCOMPLETE'); + return 0; + } else { + push(@$messages,'working directory folders cleaned up'); + return 1; + } +} + +sub load_file_task { + + my ($messages) = @_; + my ($result,$warning_count) = (0,0); + eval { + ($result,$warning_count) = load_file(); + }; + #print $@; + my $err = $@; + my $stats = ": $warning_count warnings"; + eval { + $stats .= "\n total file LNP records: " . + NGCP::BulkProcessor::Projects::ETL::Lnp::Dao::lnp::countby_delta() . ' rows'; + my $added_count = NGCP::BulkProcessor::Projects::ETL::Lnp::Dao::lnp::countby_delta( + $NGCP::BulkProcessor::Projects::ETL::Lnp::Dao::lnp::added_delta + ); + $stats .= "\n new: $added_count rows"; + my $existing_count = NGCP::BulkProcessor::Projects::ETL::Lnp::Dao::lnp::countby_delta( + $NGCP::BulkProcessor::Projects::ETL::Lnp::Dao::lnp::updated_delta + ); + $stats .= "\n existing: $existing_count rows"; + my $deleted_count = NGCP::BulkProcessor::Projects::ETL::Lnp::Dao::lnp::countby_delta( + $NGCP::BulkProcessor::Projects::ETL::Lnp::Dao::lnp::deleted_delta + ); + $stats .= "\n removed: $deleted_count rows"; + }; + if ($err or !$result) { + push(@$messages,"loading LNP file INCOMPLETE$stats"); + } else { + push(@$messages,"loading LNP file completed$stats"); + } + destroy_all_dbs(); #every task should leave with closed connections. + return $result; + +} + +sub create_lnp_task { + + my ($messages) = @_; + my ($result,$warning_count) = (0,0); + eval { + ($result,$warning_count) = create_lnp_numbers(); + }; + #print $@; + my $err = $@; + my $stats = ": $warning_count warnings"; + eval { + $stats .= "\n total mariadb LNP providers: " . + NGCP::BulkProcessor::Dao::Trunk::billing::lnp_providers::countby_name() . ' rows'; + $stats .= "\n total mariadb LNP numbers: " . + NGCP::BulkProcessor::Dao::Trunk::billing::lnp_numbers::countby_lnpproviderid_number() . ' rows'; + }; + if ($err or !$result) { + push(@$messages,"creating LNP numbers INCOMPLETE$stats"); + } else { + push(@$messages,"creating LNP numbers completed$stats"); + } + destroy_all_dbs(); #every task should leave with closed connections. + return 1; #$result; + +} + +sub delete_lnp_task { + + my ($messages) = @_; + my ($result,$warning_count) = (0,0); + eval { + ($result,$warning_count) = delete_lnp_numbers(); + }; + #print $@; + my $err = $@; + my $stats = ": $warning_count warnings"; + eval { + $stats .= "\n total mariadb LNP providers: " . + NGCP::BulkProcessor::Dao::Trunk::billing::lnp_providers::countby_name() . ' rows'; + $stats .= "\n total mariadb LNP numbers: " . + NGCP::BulkProcessor::Dao::Trunk::billing::lnp_numbers::countby_lnpproviderid_number() . ' rows'; + }; + if ($err or !$result) { + push(@$messages,"deleting LNP numbers INCOMPLETE$stats"); + } else { + push(@$messages,"deleting LNP numbers completed$stats"); + } + destroy_all_dbs(); #every task should leave with closed connections. + return 1; #$result; + +} + +__DATA__ +This exists to allow the locking code at the beginning of the file to work. +DO NOT REMOVE THESE LINES! From f96c972509c34f32dada531e533ae287c00c34ad Mon Sep 17 00:00:00 2001 From: Rene Krenn Date: Mon, 1 Aug 2022 17:13:10 +0200 Subject: [PATCH 04/12] TT#175101 LNP database loader config Change-Id: I215754bfb981c0b69204422b0e6ab5cae5040f23 (cherry picked from commit 51631b7d771cdf8aed8216694a0f76316f20ba33) --- .../Projects/ETL/Lnp/settings.yml | 65 +++++++++++++++++++ 1 file changed, 65 insertions(+) create mode 100644 lib/NGCP/BulkProcessor/Projects/ETL/Lnp/settings.yml diff --git a/lib/NGCP/BulkProcessor/Projects/ETL/Lnp/settings.yml b/lib/NGCP/BulkProcessor/Projects/ETL/Lnp/settings.yml new file mode 100644 index 00000000..5cf97c01 --- /dev/null +++ b/lib/NGCP/BulkProcessor/Projects/ETL/Lnp/settings.yml @@ -0,0 +1,65 @@ + +#dry=0 +#skip_errors=0 + +import_multithreading: 1 +#lnp_import_numofthreads: + +sqlite_db_file: sqlite + +lnp_filename: ip_telekom.txt +#012_20190201233214.txt +#ip_telekom_delete.txt +#test.csv +lnp_rownum_start: 2 +lnp_import_single_row_txn: 0 +ignore_lnp_unique: 0 + +expand_numbers: !!perl/code | + { + my ($context,$row) = @_; + if ($row =~ /^Linha\d+=([^=]*)$/i) { + my ( + $DonorID, + $HolderID, + $TypeOfNumber, + $PABXMainTelephoneNumber, + $FirstTelephoneNumber, + $LastTelephoneNumber, + $PresentNRN, + $DateTimeFrom + ) = split(/,/,$1); + + my @lnp_numbers = (); + #020,012,0,212879000,212879000,212879999,D012001,2016-08-31 14:20:18 + unless ($PresentNRN =~ /^D012[0-9]{3,3}$/) { + foreach my $number ($FirstTelephoneNumber .. $LastTelephoneNumber) { + push(@lnp_numbers,[ + $PresentNRN, #'carrier_name', + $PresentNRN, #'carrier_prefix', + $number, #'number', + undef, #'routing_number', + $DateTimeFrom, #'start', + undef, #'end', + undef, #'authoritative', + undef, #'skip_rewrite', + undef, #'type', + ]); + } + } + return \@lnp_numbers; + } + + return []; + } + +create_lnp_multithreading: 1 +#create_lnp_numofthreads: 2 + +delete_lnp_multithreading: 1 +#delete_lnp_numofthreads: 2 + +ignore_lnp_numbers_unique: 0 +lnp_numbers_single_row_txn: 0 + +lnp_numbers_batch_delete: 1 \ No newline at end of file From c227cee296a1b64033fc43c9a7d411c68372dfcb Mon Sep 17 00:00:00 2001 From: Sipwise Jenkins Builder Date: Mon, 19 Sep 2022 14:34:45 +0200 Subject: [PATCH 05/12] Release new version 10.5.3.0+0~mr10.5.3.0 --- debian/changelog | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/debian/changelog b/debian/changelog index 98040090..b6360aa6 100644 --- a/debian/changelog +++ b/debian/changelog @@ -1,3 +1,11 @@ +ngcp-bulk-processor (10.5.3.0+0~mr10.5.3.0) unstable; urgency=medium + + [ Rene Krenn ] + * [1c3478d] TT#92350 TT#175101 LNP database bulk loader + * [f96c972] TT#175101 LNP database loader config + + -- Sipwise Jenkins Builder Mon, 19 Sep 2022 14:34:45 +0200 + ngcp-bulk-processor (10.5.2.0+0~mr10.5.2.0) unstable; urgency=medium * New release. From cc5ca267ec564703bb546e2922fdc3882dd4e14c Mon Sep 17 00:00:00 2001 From: Sipwise Jenkins Builder Date: Thu, 22 Dec 2022 16:18:53 +0100 Subject: [PATCH 06/12] Release new version 10.5.4.0+0~mr10.5.4.0 --- debian/changelog | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/debian/changelog b/debian/changelog index b6360aa6..5b21bdee 100644 --- a/debian/changelog +++ b/debian/changelog @@ -1,3 +1,9 @@ +ngcp-bulk-processor (10.5.4.0+0~mr10.5.4.0) unstable; urgency=medium + + * New release. + + -- Sipwise Jenkins Builder Thu, 22 Dec 2022 16:18:53 +0100 + ngcp-bulk-processor (10.5.3.0+0~mr10.5.3.0) unstable; urgency=medium [ Rene Krenn ] From 0d33f2aec9c8197a33e856f0fba503fcd33eaa70 Mon Sep 17 00:00:00 2001 From: Sipwise Jenkins Builder Date: Wed, 15 Mar 2023 16:07:45 +0100 Subject: [PATCH 07/12] Release new version 10.5.5.0+0~mr10.5.5.0 --- debian/changelog | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/debian/changelog b/debian/changelog index 5b21bdee..29a23644 100644 --- a/debian/changelog +++ b/debian/changelog @@ -1,3 +1,9 @@ +ngcp-bulk-processor (10.5.5.0+0~mr10.5.5.0) unstable; urgency=medium + + * New release. + + -- Sipwise Jenkins Builder Wed, 15 Mar 2023 16:07:45 +0100 + ngcp-bulk-processor (10.5.4.0+0~mr10.5.4.0) unstable; urgency=medium * New release. From 028dd1eb97b9be1e4ce1d2be8236ca81e22a932d Mon Sep 17 00:00:00 2001 From: Sipwise Jenkins Builder Date: Mon, 24 Jul 2023 12:19:07 +0200 Subject: [PATCH 08/12] Release new version 10.5.6.0+0~mr10.5.6.0 --- debian/changelog | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/debian/changelog b/debian/changelog index 29a23644..217ad022 100644 --- a/debian/changelog +++ b/debian/changelog @@ -1,3 +1,9 @@ +ngcp-bulk-processor (10.5.6.0+0~mr10.5.6.0) unstable; urgency=medium + + * New release. + + -- Sipwise Jenkins Builder Mon, 24 Jul 2023 12:19:07 +0200 + ngcp-bulk-processor (10.5.5.0+0~mr10.5.5.0) unstable; urgency=medium * New release. From 4d7d97c667a21af5ca290b63e88dafc5b835f49f Mon Sep 17 00:00:00 2001 From: Sipwise Jenkins Builder Date: Mon, 20 Nov 2023 16:36:29 +0100 Subject: [PATCH 09/12] Release new version 10.5.7.0+0~mr10.5.7.0 --- debian/changelog | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/debian/changelog b/debian/changelog index 217ad022..3cb967a1 100644 --- a/debian/changelog +++ b/debian/changelog @@ -1,3 +1,9 @@ +ngcp-bulk-processor (10.5.7.0+0~mr10.5.7.0) unstable; urgency=medium + + * New release. + + -- Sipwise Jenkins Builder Mon, 20 Nov 2023 16:36:29 +0100 + ngcp-bulk-processor (10.5.6.0+0~mr10.5.6.0) unstable; urgency=medium * New release. From 184a7ba3d483e872273f20d1c915ea59039ef172 Mon Sep 17 00:00:00 2001 From: Sipwise Jenkins Builder Date: Mon, 6 May 2024 14:02:54 +0200 Subject: [PATCH 10/12] Release new version 10.5.8.0+0~mr10.5.8.0 --- debian/changelog | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/debian/changelog b/debian/changelog index 3cb967a1..b4d26f3d 100644 --- a/debian/changelog +++ b/debian/changelog @@ -1,3 +1,9 @@ +ngcp-bulk-processor (10.5.8.0+0~mr10.5.8.0) unstable; urgency=medium + + * New release. + + -- Sipwise Jenkins Builder Mon, 06 May 2024 14:02:54 +0200 + ngcp-bulk-processor (10.5.7.0+0~mr10.5.7.0) unstable; urgency=medium * New release. From fad3038705ebcdaa70d2b26fb7c0ead13ba2f789 Mon Sep 17 00:00:00 2001 From: Sipwise Jenkins Builder Date: Wed, 16 Oct 2024 11:04:29 +0200 Subject: [PATCH 11/12] Release new version 10.5.9.0+0~mr10.5.9.0 --- debian/changelog | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/debian/changelog b/debian/changelog index b4d26f3d..9d641ada 100644 --- a/debian/changelog +++ b/debian/changelog @@ -1,3 +1,9 @@ +ngcp-bulk-processor (10.5.9.0+0~mr10.5.9.0) unstable; urgency=medium + + * New release. + + -- Sipwise Jenkins Builder Wed, 16 Oct 2024 11:04:29 +0200 + ngcp-bulk-processor (10.5.8.0+0~mr10.5.8.0) unstable; urgency=medium * New release. From 9768fbf444008e0ea58438d7f014b85e8d630d05 Mon Sep 17 00:00:00 2001 From: Sipwise Jenkins Builder Date: Tue, 29 Jul 2025 11:30:50 +0200 Subject: [PATCH 12/12] Release new version 10.5.10.0+0~mr10.5.10.0 --- debian/changelog | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/debian/changelog b/debian/changelog index 9d641ada..99827232 100644 --- a/debian/changelog +++ b/debian/changelog @@ -1,3 +1,9 @@ +ngcp-bulk-processor (10.5.10.0+0~mr10.5.10.0) unstable; urgency=medium + + * New release. + + -- Sipwise Jenkins Builder Tue, 29 Jul 2025 11:30:50 +0200 + ngcp-bulk-processor (10.5.9.0+0~mr10.5.9.0) unstable; urgency=medium * New release.