Skip to content

Commit 73acd8c

Browse files
bremlSuyog Rao
authored andcommitted
Wait for client socket to appear (#19)
* Wait for client socket to appear More resilent in case of errors (handle error conditions). * Configurable socket not present retry interval if `mode` is `client`
1 parent 1719ad5 commit 73acd8c

File tree

2 files changed

+35
-6
lines changed

2 files changed

+35
-6
lines changed

lib/logstash/inputs/unix.rb

Lines changed: 22 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,12 @@ class Interrupted < StandardError; end
3232
# `client` connects to a server.
3333
config :mode, :validate => ["server", "client"], :default => "server"
3434

35+
# Amount of time in seconds to wait if the socket file is not present, before retrying.
36+
# Only positive values are allowed.
37+
#
38+
# This setting is only used if `mode` is `client`.
39+
config :socket_not_present_retry_interval_seconds, :validate => :number, :required => true, :default => 5
40+
3541
def initialize(*args)
3642
super(*args)
3743
end # def initialize
@@ -61,6 +67,11 @@ def register
6167
:path => @path)
6268
raise
6369
end
70+
else # client
71+
if @socket_not_present_retry_interval_seconds < 0
72+
@logger.warn("Value #{@socket_not_present_retry_interval_seconds} for socket_not_present_retry_interval_seconds is not valid, using default value of 5 instead")
73+
@socket_not_present_retry_interval_seconds = 5
74+
end
6475
end
6576
end # def register
6677

@@ -119,10 +130,15 @@ def run(output_queue)
119130
end
120131
else
121132
while !stop?
122-
@client_socket = UNIXSocket.new(@path)
123-
@client_socket.instance_eval { class << self; include ::LogStash::Util::SocketPeer end }
124-
@logger.debug("Opened connection", :client => @path)
125-
handle_socket(@client_socket, output_queue)
133+
if File.socket?(@path) then
134+
@client_socket = UNIXSocket.new(@path)
135+
@client_socket.instance_eval { class << self; include ::LogStash::Util::SocketPeer end }
136+
@logger.debug("Opened connection", :client => @path)
137+
handle_socket(@client_socket, output_queue)
138+
else
139+
@logger.warn("Socket not present, wait for #{@subscription_retry_interval_seconds} seconds for socket to appear", :client => @path)
140+
sleep @socket_not_present_retry_interval_seconds
141+
end
126142
end
127143
end
128144
rescue IOError
@@ -135,9 +151,9 @@ def run(output_queue)
135151
def stop
136152
if server?
137153
File.unlink(@path)
138-
@server_socket.close
154+
@server_socket.close unless @server_socket.nil?
139155
else
140-
@client_socket.close
156+
@client_socket.close unless @client_socket.nil?
141157
end
142158
rescue IOError
143159
# if socket with @mode == client was closed by the client, an other call to @client_socket.close

spec/inputs/unix_spec.rb

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,19 @@
1212
expect { plugin.register }.to_not raise_error
1313
end
1414

15+
describe "when mode is client" do
16+
17+
let(:mode) { "client" }
18+
19+
context "if socket_not_present_retry_interval_seconds is out of bounds" do
20+
it "should fallback to default value" do
21+
plugin = LogStash::Plugin.lookup("input", "unix").new({ "path" => tempfile.path, "force_unlink" => true, "mode" => mode, "socket_not_present_retry_interval_seconds" => -1 })
22+
plugin.register
23+
expect(plugin.instance_variable_get(:@socket_not_present_retry_interval_seconds)).to be 5
24+
end
25+
end
26+
end
27+
1528
describe "when interrupting the plugin" do
1629

1730
context "#server" do

0 commit comments

Comments
 (0)