Class: Triglav::Agent::Base::Processor
- Inherits:
-
Object
- Object
- Triglav::Agent::Base::Processor
- Defined in:
- lib/triglav/agent/base/processor.rb
Overview
Triglav agent processor class.
An instance is created for a `resource_uri_prefix`.
You usually do not need to customize this class, but if you want to implement your original, configure
Triglav::Agent::Configuration.processor_class
Instance Attribute Summary collapse
-
#resource_uri_prefix ⇒ Object
readonly
Returns the value of attribute resource_uri_prefix.
-
#worker ⇒ Object
readonly
Returns the value of attribute worker.
Class Method Summary collapse
Instance Method Summary collapse
-
#initialize(worker, resource_uri_prefix) ⇒ Processor
constructor
A new instance of Processor.
- #process ⇒ Object
- #total_count ⇒ Object
Constructor Details
#initialize(worker, resource_uri_prefix) ⇒ Processor
Returns a new instance of Processor
18 19 20 21 |
# File 'lib/triglav/agent/base/processor.rb', line 18 def initialize(worker, resource_uri_prefix) @worker = worker @resource_uri_prefix = resource_uri_prefix end |
Instance Attribute Details
#resource_uri_prefix ⇒ Object (readonly)
Returns the value of attribute resource_uri_prefix
16 17 18 |
# File 'lib/triglav/agent/base/processor.rb', line 16 def resource_uri_prefix @resource_uri_prefix end |
#worker ⇒ Object (readonly)
Returns the value of attribute worker
16 17 18 |
# File 'lib/triglav/agent/base/processor.rb', line 16 def worker @worker end |
Class Method Details
.max_consecuitive_error_count ⇒ Object
23 24 25 |
# File 'lib/triglav/agent/base/processor.rb', line 23 def self.max_consecuitive_error_count 3 end |
Instance Method Details
#process ⇒ Object
27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 |
# File 'lib/triglav/agent/base/processor.rb', line 27 def process before_process success_count = 0 consecutive_error_count = 0 Parallel.each(resources, parallel_opts) do |resource| raise Parallel::Break if stopped? events = nil begin @connection_pool.with do |connection| monitor = monitor_class.new(connection, resource_uri_prefix, resource) monitor.process do |_events| events = _events $logger.info { "send_messages:#{events.map(&:to_hash).to_json}" } @api_client_pool.with {|api_client| api_client.(events) } end end @mutex.synchronize do success_count += 1 consecutive_error_count = 0 end rescue => e log_error(e) $logger.info { "failed_events:#{events.map(&:to_hash).to_json}" } if events @mutex.synchronize do raise TooManyError if (consecutive_error_count += 1) > self.class.max_consecuitive_error_count end end end success_count ensure after_process end |
#total_count ⇒ Object
60 61 62 |
# File 'lib/triglav/agent/base/processor.rb', line 60 def total_count resources.size end |