def run
return unless [:left, :right].any? do |database|
changes_pending = false
t = Thread.new do
changes_pending = session.send(database).select_one(
"select id from #{session.configuration.options[:rep_prefix]}_pending_changes limit 1"
) != nil
end
t.join session.configuration.options[:database_connection_timeout]
changes_pending
end
return if sweeper.terminated?
success = false
begin
replicator
loop do
begin
diff = load_difference
break unless diff.loaded?
break if sweeper.terminated?
if diff.type != :no_diff and not event_filtered?(diff)
replicator.replicate_difference diff
end
rescue Exception => e
if e.message =~ /violates foreign key constraint|foreign key constraint fails/i and !diff.second_chance?
diff.second_chance = true
second_chancers << diff
else
begin
helper.log_replication_outcome diff, e.message,
e.class.to_s + "\n" + e.backtrace.join("\n")
rescue Exception => _
raise e
end
end
end
end
success = true
ensure
if sweeper.terminated?
helper.finalize false
session.disconnect_databases
else
helper.finalize success
end
end
end