diff --git a/docs/index.asciidoc b/docs/index.asciidoc index c5ac262..e260181 100644 --- a/docs/index.asciidoc +++ b/docs/index.asciidoc @@ -45,7 +45,9 @@ This plugin supports the following configuration options plus the <> |<>|Yes | <> |<>|No | <> |<>|No +| <> |<>|No | <> |<>|No +| <> |<>|No | <> |<>|Yes | <> |<>|No |======================================================================= @@ -56,7 +58,7 @@ input plugins.   [id="plugins-{type}s-{plugin}-check_interval"] -===== `check_interval` +===== `check_interval` * Value type is <> * Default value is `300` @@ -64,7 +66,7 @@ input plugins. [id="plugins-{type}s-{plugin}-content_type"] -===== `content_type` +===== `content_type` * Value type is <> * Default value is `"text/plain"` @@ -73,7 +75,7 @@ For multipart messages, use the first part that has this content-type as the event message. [id="plugins-{type}s-{plugin}-delete"] -===== `delete` +===== `delete` * Value type is <> * Default value is `false` @@ -81,7 +83,7 @@ content-type as the event message. [id="plugins-{type}s-{plugin}-expunge"] -===== `expunge` +===== `expunge` * Value type is <> * Default value is `false` @@ -89,7 +91,7 @@ content-type as the event message. [id="plugins-{type}s-{plugin}-fetch_count"] -===== `fetch_count` +===== `fetch_count` * Value type is <> * Default value is `50` @@ -97,7 +99,7 @@ content-type as the event message. [id="plugins-{type}s-{plugin}-folder"] -===== `folder` +===== `folder` * Value type is <> * Default value is `"INBOX"` @@ -105,7 +107,7 @@ content-type as the event message. [id="plugins-{type}s-{plugin}-host"] -===== `host` +===== `host` * This is a required setting. * Value type is <> @@ -114,7 +116,7 @@ content-type as the event message. [id="plugins-{type}s-{plugin}-lowercase_headers"] -===== `lowercase_headers` +===== `lowercase_headers` * Value type is <> * Default value is `true` @@ -122,7 +124,7 @@ content-type as the event message. [id="plugins-{type}s-{plugin}-password"] -===== `password` +===== `password` * This is a required setting. * Value type is <> @@ -131,7 +133,7 @@ content-type as the event message. [id="plugins-{type}s-{plugin}-port"] -===== `port` +===== `port` * Value type is <> * There is no default value for this setting. @@ -139,23 +141,52 @@ content-type as the event message. [id="plugins-{type}s-{plugin}-secure"] -===== `secure` +===== `secure` * Value type is <> * Default value is `true` +[id="plugins-{type}s-{plugin}-sincedb_path"] +===== `sincedb_path` + + * Value type is <> + * There is no default value for this setting. + +Path of the sincedb database file (keeps track of the UID of the last processed +mail) that will be written to disk. The default will write sincedb file to +`/plugins/inputs/imap` directory. +NOTE: it must be a file path and not a directory path. + [id="plugins-{type}s-{plugin}-strip_attachments"] -===== `strip_attachments` +===== `strip_attachments` * Value type is <> * Default value is `false` +[id="plugins-{type}s-{plugin}-uid_tracking"] +===== `uid_tracking` + + * Value type is <> + * Default value is `false` + +When the IMAP input plugin connects to the mailbox for the first time and +the UID of the last processed mail is not yet known, the unread mails are +first downloaded and the UID of the last processed mail is saved. From +this point on, if `uid_tracking` is set to `true`, all new mail will be +downloaded regardless of whether they are marked as read or unread. This +allows users or other services to use the mailbox simultaneously with the +IMAP input plugin. UID of the last processed mail is always saved regardles +of the `uid_tracking` value, so you can switch its value as needed. In +transition from the previous IMAP input plugin version, first process at least +one mail with `uid_tracking` set to `false` to save the UID of the last +processed mail and then switch `uid_tracking` to `true`. + [id="plugins-{type}s-{plugin}-user"] -===== `user` +===== `user` * This is a required setting. * Value type is <> @@ -164,7 +195,7 @@ content-type as the event message. [id="plugins-{type}s-{plugin}-verify_cert"] -===== `verify_cert` +===== `verify_cert` * Value type is <> * Default value is `true` @@ -176,4 +207,4 @@ content-type as the event message. [id="plugins-{type}s-{plugin}-common-options"] include::{include_path}/{type}.asciidoc[] -:default_codec!: \ No newline at end of file +:default_codec!: diff --git a/lib/logstash/inputs/imap.rb b/lib/logstash/inputs/imap.rb index d449957..1b909a2 100644 --- a/lib/logstash/inputs/imap.rb +++ b/lib/logstash/inputs/imap.rb @@ -29,11 +29,17 @@ class LogStash::Inputs::IMAP < LogStash::Inputs::Base config :delete, :validate => :boolean, :default => false config :expunge, :validate => :boolean, :default => false config :strip_attachments, :validate => :boolean, :default => false - + # For multipart messages, use the first part that has this # content-type as the event message. config :content_type, :validate => :string, :default => "text/plain" + # Whether to use IMAP uid to track last processed message + config :uid_tracking, :validate => :boolean, :default => false + + # Path to file with last run time metadata + config :sincedb_path, :validate => :string, :required => false + def register require "net/imap" # in stdlib require "mail" # gem 'mail' @@ -50,6 +56,22 @@ def register end end + # Load last processed IMAP uid from file if exists + if @sincedb_path.nil? + datapath = File.join(LogStash::SETTINGS.get_value("path.data"), "plugins", "inputs", "imap") + # Ensure that the filepath exists before writing, since it's deeply nested. + FileUtils::mkdir_p datapath + @sincedb_path = File.join(datapath, ".sincedb_" + Digest::MD5.hexdigest("#{@user}_#{@host}_#{@port}_#{@folder}")) + end + if File.directory?(@sincedb_path) + raise ArgumentError.new("The \"sincedb_path\" argument must point to a file, received a directory: \"#{@sincedb_path}\"") + end + @logger.info("Using \"sincedb_path\": \"#{@sincedb_path}\"") + if File.exist?(@sincedb_path) + @uid_last_value = File.read(@sincedb_path).to_i + @logger.info("Loading \"uid_last_value\": \"#{@uid_last_value}\"") + end + @content_type_re = Regexp.new("^" + @content_type) end # def register @@ -75,34 +97,56 @@ def check_mail(queue) # EOFError, OpenSSL::SSL::SSLError imap = connect imap.select(@folder) - ids = imap.search("NOT SEEN") + if @uid_tracking && @uid_last_value + # If there are no new messages, uid_search returns @uid_last_value + # because it is the last message, so we need to delete it. + ids = imap.uid_search(["UID", (@uid_last_value+1..-1)]).delete_if { |uid| + uid <= @uid_last_value + } + else + ids = imap.uid_search("NOT SEEN") + end ids.each_slice(@fetch_count) do |id_set| - items = imap.fetch(id_set, "RFC822") + items = imap.uid_fetch(id_set, ["BODY.PEEK[]", "UID"]) items.each do |item| - next unless item.attr.has_key?("RFC822") - mail = Mail.read_from_string(item.attr["RFC822"]) + next unless item.attr.has_key?("BODY[]") + mail = Mail.read_from_string(item.attr["BODY[]"]) if @strip_attachments queue << parse_mail(mail.without_attachments!) else queue << parse_mail(mail) end + # Mark message as processed + @uid_last_value = item.attr["UID"] + imap.uid_store(@uid_last_value, '+FLAGS', @delete || @expunge ? :Deleted : :Seen) + + # Stop message processing if it is requested + break if stop? end - imap.store(id_set, '+FLAGS', @delete ? :Deleted : :Seen) - - end + # Expunge deleted messages + imap.expunge() if @expunge - # Enable an 'expunge' IMAP command after the items.each loop - if @expunge - # Force messages to be marked as "Deleted", the above may or may not be working as expected. "Seen" means nothing if you are going to - # delete a message after processing. - imap.store(id_set, '+FLAGS', [:Deleted]) - imap.expunge() + # Stop message fetching if it is requested + break if stop? end - imap.close - imap.disconnect + rescue => e + @logger.error("Encountered error #{e.class}", :message => e.message, :backtrace => e.backtrace) + # Do not raise error, check_mail will be invoked in the next run time + + ensure + # Close the connection (and ignore errors) + imap.close rescue nil + imap.disconnect rescue nil + + # Always save @uid_last_value so when tracking is switched from + # "NOT SEEN" to "UID" we will continue from first unprocessed message + if @uid_last_value + @logger.info("Saving \"uid_last_value\": \"#{@uid_last_value}\"") + File.write(@sincedb_path, @uid_last_value) + end end def parse_mail(mail) diff --git a/spec/inputs/imap_spec.rb b/spec/inputs/imap_spec.rb index ed1bb48..647791f 100644 --- a/spec/inputs/imap_spec.rb +++ b/spec/inputs/imap_spec.rb @@ -25,7 +25,7 @@ allow(imap).to receive(:store) allow(ids).to receive(:each_slice).and_return([]) - allow(imap).to receive(:search).with("NOT SEEN").and_return(ids) + allow(imap).to receive(:uid_search).with("NOT SEEN").and_return(ids) allow(Net::IMAP).to receive(:new).and_return(imap) end end